MAPF_Solver / algs /alg_functions_PrP.py
ArseniyPerchik's picture
more
ad7641a
import heapq
import random
from globals import *
from functions import *
from functions_plotting import *
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# CLASSES
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# class AgentPrP:
# def __init__(self, num: int, start_node: Node, goal_node: Node):
# self.num = num
# self.name = f'agent_{num}'
# self.start_node: Node = start_node
# self.curr_node: Node = start_node
# self.goal_node: Node = goal_node
# self.path: List[Node] = []
# self.k_path: List[Node] | None = None
#
# @property
# def path_names(self):
# return [n.xy_name for n in self.path]
#
# def update_curr_node(self, i_time):
# if i_time >= len(self.path):
# self.curr_node = self.path[-1]
# return
# self.curr_node = self.path[i_time]
#
# def __str__(self):
# return self.name
#
# def __repr__(self):
# return self.name
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# FUNCS
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
def create_prp_agents(
start_nodes: List[Node], goal_nodes: List[Node]
) -> Tuple[List[AgentAlg], Dict[str, AgentAlg]]:
agents: List[AgentAlg] = []
agents_dict: Dict[str, AgentAlg] = {}
for num, (s_node, g_node) in enumerate(zip(start_nodes, goal_nodes)):
new_agent = AgentAlg(num, s_node, g_node)
agents.append(new_agent)
agents_dict[new_agent.name] = new_agent
return agents, agents_dict
def create_hard_and_soft_constraints(
h_priority_agents: List[AgentAlg],
map_dim: Tuple[int, int],
constr_type: str,
flag_k_limit: bool = False,
):
assert constr_type in ['hard', 'soft']
if len(h_priority_agents) == 0:
max_path_len = 1
vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, max_path_len)
vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, max_path_len)
# vc_hard_np, ec_hard_np, pc_hard_np = None, None, None
# vc_soft_np, ec_soft_np, pc_soft_np = None, None, None
return vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np
if flag_k_limit:
max_path_len = max([len(a.k_path) for a in h_priority_agents])
paths = [a.k_path for a in h_priority_agents]
else:
max_path_len = max([len(a.path) for a in h_priority_agents])
paths = [a.path for a in h_priority_agents]
if constr_type == 'hard':
vc_hard_np, ec_hard_np, pc_hard_np = create_constraints(paths, map_dim, max_path_len)
vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, max_path_len)
# vc_soft_np, ec_soft_np, pc_soft_np = None, None, None
elif constr_type == 'soft':
vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, max_path_len)
# vc_hard_np, ec_hard_np, pc_hard_np = None, None, None
vc_soft_np, ec_soft_np, pc_soft_np = create_constraints(paths, map_dim, max_path_len)
else:
raise RuntimeError('nope')
return vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np
def solution_is_found(agents: List[AgentAlg]):
for agent in agents:
if agent.path is None:
return False
if len(agent.path) == 0:
return False
if agent.path[-1] != agent.goal_node:
return False
return True
def get_shuffled_agents(agents: List[AgentAlg]) -> List[AgentAlg]:
agents_copy = agents[:]
random.shuffle(agents_copy)
unfinished: List[AgentAlg] = [a for a in agents_copy if len(a.path) == 0 or a.path[-1] != a.goal_node]
finished: List[AgentAlg] = [a for a in agents_copy if len(a.path) > 0 and a.path[-1] == a.goal_node]
return [*unfinished, *finished]