Spaces:
Runtime error
Runtime error
| from algs.alg_functions_pibt import * | |
| from run_single_MAPF_func import run_mapf_alg | |
| def run_pibt( | |
| start_nodes: List[Node], | |
| goal_nodes: List[Node], | |
| nodes: List[Node], | |
| nodes_dict: Dict[str, Node], | |
| h_dict: Dict[str, np.ndarray], | |
| map_dim: Tuple[int, int], | |
| params: Dict | |
| ) -> Tuple[None, Dict] | Tuple[Dict[str, List[Node]], Dict]: | |
| max_time: int | float = params['max_time'] | |
| start_time = time.time() | |
| # create agents | |
| agents, agents_dict = create_agents(start_nodes, goal_nodes) | |
| n_agents = len(agents_dict) | |
| agents.sort(key=lambda a: a.priority, reverse=True) | |
| iteration = 0 | |
| finished = False | |
| while not finished: | |
| config_from: Dict[str, Node] = {a.name: a.path[-1] for a in agents} | |
| occupied_from: Dict[str, AgentAlg] = {a.path[-1].xy_name: a for a in agents} | |
| config_to: Dict[str, Node] = {} | |
| occupied_to: Dict[str, AgentAlg] = {} | |
| # calc the step | |
| for agent in agents: | |
| if agent.name not in config_to: | |
| _ = run_procedure_pibt( | |
| agent, | |
| config_from, occupied_from, | |
| config_to, occupied_to, | |
| agents_dict, nodes_dict, h_dict, []) | |
| # execute the step + check the termination condition | |
| finished = True | |
| agents_finished = [] | |
| for agent in agents: | |
| next_node = config_to[agent.name] | |
| agent.path.append(next_node) | |
| agent.prev_node = agent.curr_node | |
| agent.curr_node = next_node | |
| if agent.curr_node != agent.goal_node: | |
| finished = False | |
| agent.priority += 1 | |
| else: | |
| agent.priority = agent.init_priority | |
| agents_finished.append(agent) | |
| # unfinished first | |
| agents.sort(key=lambda a: a.priority, reverse=True) | |
| # print + render | |
| runtime = time.time() - start_time | |
| print(f'\r{'*' * 10} | [PIBT] {iteration=: <3} | finished: {len(agents_finished)}/{n_agents: <3} | runtime: {runtime: .2f} seconds | {'*' * 10}', end='') | |
| iteration += 1 | |
| if runtime > max_time: | |
| return None, {} | |
| # checks | |
| for i in range(len(agents[0].path)): | |
| check_vc_ec_neic_iter(agents, i, to_count=False) | |
| runtime = time.time() - start_time | |
| return {a.name: a.path for a in agents}, {'agents': agents, 'time': runtime, 'makespan': iteration} | |
| def main(): | |
| params = { | |
| 'max_time': 100, | |
| } | |
| run_mapf_alg(alg=run_pibt, params=params) | |
| if __name__ == '__main__': | |
| main() | |