Spaces:
Runtime error
Runtime error
| from algs.alg_functions_PrP import * | |
| from algs.alg_sipps import run_sipps | |
| from algs.alg_temporal_a_star import run_temporal_a_star | |
| from algs.alg_sipps_functions import init_si_table, update_si_table_soft, update_si_table_hard | |
| from run_single_MAPF_func import run_mapf_alg | |
| def run_prp_sipps( | |
| start_nodes: List[Node], | |
| goal_nodes: List[Node], | |
| nodes: List[Node], | |
| nodes_dict: Dict[str, Node], | |
| h_dict: Dict[str, np.ndarray], | |
| map_dim: Tuple[int, int], | |
| params: Dict, | |
| ) -> Tuple[Dict[str, List[Node]] | None, dict]: | |
| constr_type: str = params['constr_type'] | |
| alg_name: bool = params['alg_name'] | |
| # to_render: bool = params['to_render'] | |
| max_time: bool = params['max_time'] | |
| start_time = time.time() | |
| # create agents | |
| agents = [] | |
| for num, (s_node, g_node) in enumerate(zip(start_nodes, goal_nodes)): | |
| new_agent = AgentAlg(num, s_node, g_node) | |
| agents.append(new_agent) | |
| longest_len = 1 | |
| # vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, longest_len) | |
| # vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, longest_len) | |
| ec_hard_np = init_ec_table(map_dim, longest_len) | |
| ec_soft_np = init_ec_table(map_dim, longest_len) | |
| r_iter = 0 | |
| while time.time() - start_time < max_time: | |
| # preps | |
| if constr_type == 'hard': | |
| # vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, longest_len) | |
| ec_hard_np = init_ec_table(map_dim, longest_len) | |
| elif constr_type == 'soft': | |
| # vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, longest_len) | |
| ec_soft_np = init_ec_table(map_dim, longest_len) | |
| si_table: Dict[str, List[Tuple[int, int, str]]] = init_si_table(nodes) | |
| # calc paths | |
| h_priority_agents: List[AgentAlg] = [] | |
| for agent in agents: | |
| new_path, alg_info = run_sipps( | |
| agent.start_node, agent.goal_node, nodes, nodes_dict, h_dict, | |
| None, ec_hard_np, None, None, ec_soft_np, None, | |
| agent=agent, si_table=si_table | |
| ) | |
| if time.time() - start_time > max_time: | |
| return None, {} | |
| if new_path is None: | |
| agent.path = None | |
| break | |
| agent.path = new_path[:] | |
| h_priority_agents.append(agent) | |
| align_all_paths(h_priority_agents) | |
| if constr_type == 'hard': | |
| si_table = update_si_table_hard(new_path, si_table) | |
| elif constr_type == 'soft': | |
| si_table = update_si_table_soft(new_path, si_table) | |
| if longest_len < len(new_path): | |
| longest_len = len(new_path) | |
| # vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, longest_len) | |
| # vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, longest_len) | |
| ec_hard_np = init_ec_table(map_dim, longest_len) | |
| ec_soft_np = init_ec_table(map_dim, longest_len) | |
| if constr_type == 'hard': | |
| for h_agent in h_priority_agents: | |
| # update_constraints(h_agent.path, vc_hard_np, ec_hard_np, pc_hard_np) | |
| update_ec_table(h_agent.path, ec_hard_np) | |
| elif constr_type == 'soft': | |
| for h_agent in h_priority_agents: | |
| # update_constraints(h_agent.path, vc_soft_np, ec_soft_np, pc_soft_np) | |
| update_ec_table(h_agent.path, ec_soft_np) | |
| else: | |
| if constr_type == 'hard': | |
| # update_constraints(new_path, vc_hard_np, ec_hard_np, pc_hard_np) | |
| update_ec_table(new_path, ec_hard_np) | |
| elif constr_type == 'soft': | |
| # update_constraints(new_path, vc_soft_np, ec_soft_np, pc_soft_np) | |
| update_ec_table(new_path, ec_soft_np) | |
| # checks | |
| runtime = time.time() - start_time | |
| print(f'\r[{alg_name}] {r_iter=: <3} | agents: {len(h_priority_agents): <3} / {len(agents)} | {runtime= : .2f} s.', end='') # , end='' | |
| # collisions: int = 0 | |
| # for i in range(len(h_priority_agents[0].path)): | |
| # check_vc_ec_neic_iter(h_priority_agents, i, False) | |
| # if collisions > 0: | |
| # print(f'{collisions=} | {alg_info['c']=}') | |
| # return check | |
| if solution_is_found(agents): | |
| runtime = time.time() - start_time | |
| makespan: int = max([len(a.path) for a in agents]) | |
| return {a.name: a.path for a in agents}, {'agents': agents, 'time': runtime, 'makespan': makespan} | |
| # reshuffle | |
| r_iter += 1 | |
| random.shuffle(agents) | |
| for agent in agents: | |
| agent.path = [] | |
| return None, {} | |
| def run_prp_a_star( | |
| start_nodes: List[Node], | |
| goal_nodes: List[Node], | |
| nodes: List[Node], | |
| nodes_dict: Dict[str, Node], | |
| h_dict: Dict[str, np.ndarray], | |
| map_dim: Tuple[int, int], | |
| params: Dict, | |
| ) -> Tuple[Dict[str, List[Node]] | None, dict]: | |
| alg_name: bool = params['alg_name'] | |
| to_render: bool = params['to_render'] | |
| max_time: bool = params['max_time'] | |
| start_time = time.time() | |
| # create agents | |
| agents = [] | |
| for num, (s_node, g_node) in enumerate(zip(start_nodes, goal_nodes)): | |
| new_agent = AgentAlg(num, s_node, g_node) | |
| agents.append(new_agent) | |
| r_iter = 0 | |
| while time.time() - start_time < max_time: | |
| # calc paths | |
| h_priority_agents: List[AgentAlg] = [] | |
| longest_len = 1 | |
| vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, longest_len) | |
| vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, longest_len) | |
| for agent in agents: | |
| new_path, alg_info = run_temporal_a_star( | |
| agent.start_node, agent.goal_node, nodes, nodes_dict, h_dict, | |
| vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np, | |
| agent=agent, | |
| ) | |
| if time.time() - start_time > max_time: | |
| return None, {} | |
| if new_path is None: | |
| agent.path = None | |
| break | |
| agent.path = new_path[:] | |
| h_priority_agents.append(agent) | |
| align_all_paths(h_priority_agents) | |
| if longest_len < len(new_path): | |
| longest_len = len(new_path) | |
| vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, longest_len) | |
| vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, longest_len) | |
| for h_agent in h_priority_agents: | |
| update_constraints(h_agent.path, vc_hard_np, ec_hard_np, pc_hard_np) | |
| else: | |
| update_constraints(new_path, vc_hard_np, ec_hard_np, pc_hard_np) | |
| # checks | |
| runtime = time.time() - start_time | |
| print(f'\r[{alg_name}] {r_iter=: <3} | agents: {len(h_priority_agents): <3} / {len(agents)} | {runtime= : .2f} s.') # , end='' | |
| # collisions: int = 0 | |
| # for i in range(len(h_priority_agents[0].path)): | |
| # to_count = False if constr_type == 'hard' else True | |
| # # collisions += check_vc_ec_neic_iter(h_priority_agents, i, to_count) | |
| # if collisions > 0: | |
| # print(f'{collisions=} | {alg_info['c']=}') | |
| # return check | |
| if solution_is_found(agents): | |
| runtime = time.time() - start_time | |
| makespan: int = max([len(a.path) for a in agents]) | |
| return {a.name: a.path for a in agents}, {'agents': agents, 'time': runtime, 'makespan': makespan} | |
| # reshuffle | |
| r_iter += 1 | |
| random.shuffle(agents) | |
| # agents = get_shuffled_agents(agents) # - no meaning | |
| for agent in agents: | |
| agent.path = [] | |
| return None, {} | |
| def run_k_prp( | |
| start_nodes: List[Node], | |
| goal_nodes: List[Node], | |
| nodes: List[Node], | |
| nodes_dict: Dict[str, Node], | |
| h_dict: Dict[str, np.ndarray], | |
| map_dim: Tuple[int, int], | |
| params: Dict, | |
| ) -> Tuple[Dict[str, List[Node]] | None, dict]: | |
| """ | |
| -> MAPF: | |
| - stop condition: all agents at their locations or time is up | |
| - behaviour, when agent is at its goal: the goal remains the same | |
| - output: success, time, makespan, soc | |
| LMAPF: | |
| - stop condition: the end of n iterations where every iteration has a time limit | |
| - behaviour, when agent is at its goal: agent receives a new goal | |
| - output: throughput | |
| """ | |
| # constr_type: str = params['constr_type'] | |
| k_limit: int = params['k_limit'] | |
| alg_name: bool = params['alg_name'] | |
| pf_alg = params['pf_alg'] | |
| pf_alg_name = params['pf_alg_name'] | |
| to_render: bool = params['to_render'] | |
| max_time: bool = params['max_time'] | |
| img_np: np.ndarray = params['img_np'] | |
| if to_render: | |
| fig, ax = plt.subplots(1, 2, figsize=(14, 7)) | |
| start_time = time.time() | |
| # create agents | |
| agents: List[AgentAlg] = [] | |
| for num, (s_node, g_node) in enumerate(zip(start_nodes, goal_nodes)): | |
| new_agent = AgentAlg(num, s_node, g_node) | |
| agents.append(new_agent) | |
| # main loop | |
| k_iter = 0 | |
| while time.time() - start_time < max_time: | |
| si_table: Dict[str, List[Tuple[int, int, str]]] = init_si_table(nodes) | |
| vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, k_limit + 1) | |
| vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, k_limit + 1) | |
| # calc k paths | |
| all_good: bool = True | |
| h_priority_agents: List[AgentAlg] = [] | |
| for agent in agents: | |
| new_path, alg_info = pf_alg( | |
| agent.curr_node, agent.goal_node, nodes, nodes_dict, h_dict, | |
| vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np, | |
| flag_k_limit=True, k_limit=k_limit, agent=agent, si_table=si_table | |
| ) | |
| if new_path is None: | |
| all_good = False | |
| break | |
| new_path = align_path(new_path, k_limit + 1) | |
| agent.k_path = new_path[:] | |
| # checks | |
| # for i in range(k_limit + 1): | |
| # other_paths = {a.name: a.k_path for a in h_priority_agents if a != agent} | |
| # check_one_vc_ec_neic_iter(agent.k_path, agent.name, other_paths, i) | |
| h_priority_agents.append(agent) | |
| if pf_alg_name == 'sipps': | |
| update_constraints(new_path, vc_hard_np, ec_hard_np, pc_hard_np) | |
| si_table = update_si_table_hard(new_path, si_table, consider_pc=False) | |
| elif pf_alg_name == 'a_star': | |
| update_constraints(new_path, vc_hard_np, ec_hard_np, pc_hard_np) | |
| else: | |
| raise RuntimeError('nono') | |
| # reset k paths if not good | |
| if not all_good: | |
| for agent in agents: | |
| agent.k_path = [] | |
| # ------------------------------ # | |
| # ------------------------------ # | |
| # ------------------------------ # | |
| if all_good and to_render: | |
| for i in range(k_limit): | |
| for a in agents: | |
| a.curr_node = a.k_path[i] | |
| # plot the iteration | |
| i_agent = agents[0] | |
| plot_info = { | |
| 'img_np': img_np, | |
| 'agents': agents, | |
| 'i_agent': i_agent, | |
| } | |
| plot_step_in_env(ax[0], plot_info) | |
| plt.pause(0.001) | |
| # plt.pause(1) | |
| # append paths | |
| for agent in agents: | |
| agent.path.extend(agent.k_path[1:]) | |
| if len(agent.path) > 0: | |
| agent.curr_node = agent.path[-1] | |
| runtime = time.time() - start_time | |
| finished: List[AgentAlg] = [a for a in agents if len(a.path) > 0 and a.path[-1] == a.goal_node] | |
| print(f'\r[{alg_name}] {k_iter=: <3} | agents: {len(finished): <3} / {len(agents)} | {runtime=: .2f} s.') # , end='' | |
| # return check | |
| if solution_is_found(agents): | |
| runtime = time.time() - start_time | |
| makespan: int = max([len(a.path) for a in agents]) | |
| return {a.name: a.path for a in agents}, {'agents': agents, 'time': runtime, 'makespan': makespan} | |
| # reshuffle | |
| k_iter += 1 | |
| # random.shuffle(agents) | |
| agents = get_shuffled_agents(agents) | |
| for agent in agents: | |
| agent.k_path = [] | |
| return None, {} | |
| def main(): | |
| # final_render = True | |
| to_render = False | |
| # --------------------------------------------------------------------- # | |
| # PrP-A* | |
| # --------------------------------------------------------------------- # | |
| # params_prp_a_star = { | |
| # 'max_time': 1000, | |
| # 'alg_name': f'PrP-A*', | |
| # 'constr_type': 'hard', | |
| # 'pf_alg': run_temporal_a_star, | |
| # 'final_render': final_render, | |
| # } | |
| # run_mapf_alg(alg=run_prp_a_star, params=params_prp_a_star) | |
| # --------------------------------------------------------------------- # | |
| # --------------------------------------------------------------------- # | |
| # PrP-SIPPS | |
| # --------------------------------------------------------------------- # | |
| # params_prp_sipps = { | |
| # 'max_time': 1000, | |
| # 'alg_name': f'PrP-SIPPS', | |
| # # 'constr_type': 'soft', | |
| # 'constr_type': 'hard', | |
| # 'pf_alg': run_sipps, | |
| # 'final_render': final_render, | |
| # } | |
| # run_mapf_alg(alg=run_prp_sipps, params=params_prp_sipps) | |
| # --------------------------------------------------------------------- # | |
| # --------------------------------------------------------------------- # | |
| # k-PrP - A* | |
| # --------------------------------------------------------------------- # | |
| # params_k_prp_a_star = { | |
| # 'max_time': 1000, | |
| # 'alg_name': f'k-PrP-A*', | |
| # 'constr_type': 'hard', | |
| # 'k_limit': 5, | |
| # 'pf_alg_name': 'a_star', | |
| # 'pf_alg': run_temporal_a_star, | |
| # 'final_render': final_render, | |
| # } | |
| # run_mapf_alg(alg=run_k_prp, params=params_k_prp_a_star) | |
| # --------------------------------------------------------------------- # | |
| # --------------------------------------------------------------------- # | |
| # k-PrP - SIPPS | |
| # --------------------------------------------------------------------- # | |
| params_k_prp_sipps = { | |
| 'max_time': 1000, | |
| 'alg_name': f'k-PrP-SIPPS', | |
| 'constr_type': 'hard', | |
| 'k_limit': 5, | |
| 'pf_alg_name': 'sipps', | |
| 'pf_alg': run_sipps, | |
| 'final_render': to_render, | |
| } | |
| run_mapf_alg(alg=run_k_prp, params=params_k_prp_sipps) | |
| # --------------------------------------------------------------------- # | |
| if __name__ == '__main__': | |
| main() | |