MAPF_Solver / algs /alg_mapf_pibt.py
ArseniyPerchik's picture
more
ad7641a
from algs.alg_functions_pibt import *
from run_single_MAPF_func import run_mapf_alg
def run_pibt(
start_nodes: List[Node],
goal_nodes: List[Node],
nodes: List[Node],
nodes_dict: Dict[str, Node],
h_dict: Dict[str, np.ndarray],
map_dim: Tuple[int, int],
params: Dict
) -> Tuple[None, Dict] | Tuple[Dict[str, List[Node]], Dict]:
max_time: int | float = params['max_time']
start_time = time.time()
# create agents
agents, agents_dict = create_agents(start_nodes, goal_nodes)
n_agents = len(agents_dict)
agents.sort(key=lambda a: a.priority, reverse=True)
iteration = 0
finished = False
while not finished:
config_from: Dict[str, Node] = {a.name: a.path[-1] for a in agents}
occupied_from: Dict[str, AgentAlg] = {a.path[-1].xy_name: a for a in agents}
config_to: Dict[str, Node] = {}
occupied_to: Dict[str, AgentAlg] = {}
# calc the step
for agent in agents:
if agent.name not in config_to:
_ = run_procedure_pibt(
agent,
config_from, occupied_from,
config_to, occupied_to,
agents_dict, nodes_dict, h_dict, [])
# execute the step + check the termination condition
finished = True
agents_finished = []
for agent in agents:
next_node = config_to[agent.name]
agent.path.append(next_node)
agent.prev_node = agent.curr_node
agent.curr_node = next_node
if agent.curr_node != agent.goal_node:
finished = False
agent.priority += 1
else:
agent.priority = agent.init_priority
agents_finished.append(agent)
# unfinished first
agents.sort(key=lambda a: a.priority, reverse=True)
# print + render
runtime = time.time() - start_time
print(f'\r{'*' * 10} | [PIBT] {iteration=: <3} | finished: {len(agents_finished)}/{n_agents: <3} | runtime: {runtime: .2f} seconds | {'*' * 10}', end='')
iteration += 1
if runtime > max_time:
return None, {}
# checks
for i in range(len(agents[0].path)):
check_vc_ec_neic_iter(agents, i, to_count=False)
runtime = time.time() - start_time
return {a.name: a.path for a in agents}, {'agents': agents, 'time': runtime, 'makespan': iteration}
@use_profiler(save_dir='../stats/alg_pibt.pstat')
def main():
params = {
'max_time': 100,
}
run_mapf_alg(alg=run_pibt, params=params)
if __name__ == '__main__':
main()