Spaces:
Runtime error
Runtime error
| import heapq | |
| import random | |
| from globals import * | |
| from functions import * | |
| from functions_plotting import * | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # CLASSES | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # class AgentPrP: | |
| # def __init__(self, num: int, start_node: Node, goal_node: Node): | |
| # self.num = num | |
| # self.name = f'agent_{num}' | |
| # self.start_node: Node = start_node | |
| # self.curr_node: Node = start_node | |
| # self.goal_node: Node = goal_node | |
| # self.path: List[Node] = [] | |
| # self.k_path: List[Node] | None = None | |
| # | |
| # @property | |
| # def path_names(self): | |
| # return [n.xy_name for n in self.path] | |
| # | |
| # def update_curr_node(self, i_time): | |
| # if i_time >= len(self.path): | |
| # self.curr_node = self.path[-1] | |
| # return | |
| # self.curr_node = self.path[i_time] | |
| # | |
| # def __str__(self): | |
| # return self.name | |
| # | |
| # def __repr__(self): | |
| # return self.name | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # FUNCS | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| def create_prp_agents( | |
| start_nodes: List[Node], goal_nodes: List[Node] | |
| ) -> Tuple[List[AgentAlg], Dict[str, AgentAlg]]: | |
| agents: List[AgentAlg] = [] | |
| agents_dict: Dict[str, AgentAlg] = {} | |
| for num, (s_node, g_node) in enumerate(zip(start_nodes, goal_nodes)): | |
| new_agent = AgentAlg(num, s_node, g_node) | |
| agents.append(new_agent) | |
| agents_dict[new_agent.name] = new_agent | |
| return agents, agents_dict | |
| def create_hard_and_soft_constraints( | |
| h_priority_agents: List[AgentAlg], | |
| map_dim: Tuple[int, int], | |
| constr_type: str, | |
| flag_k_limit: bool = False, | |
| ): | |
| assert constr_type in ['hard', 'soft'] | |
| if len(h_priority_agents) == 0: | |
| max_path_len = 1 | |
| vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, max_path_len) | |
| vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, max_path_len) | |
| # vc_hard_np, ec_hard_np, pc_hard_np = None, None, None | |
| # vc_soft_np, ec_soft_np, pc_soft_np = None, None, None | |
| return vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np | |
| if flag_k_limit: | |
| max_path_len = max([len(a.k_path) for a in h_priority_agents]) | |
| paths = [a.k_path for a in h_priority_agents] | |
| else: | |
| max_path_len = max([len(a.path) for a in h_priority_agents]) | |
| paths = [a.path for a in h_priority_agents] | |
| if constr_type == 'hard': | |
| vc_hard_np, ec_hard_np, pc_hard_np = create_constraints(paths, map_dim, max_path_len) | |
| vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, max_path_len) | |
| # vc_soft_np, ec_soft_np, pc_soft_np = None, None, None | |
| elif constr_type == 'soft': | |
| vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, max_path_len) | |
| # vc_hard_np, ec_hard_np, pc_hard_np = None, None, None | |
| vc_soft_np, ec_soft_np, pc_soft_np = create_constraints(paths, map_dim, max_path_len) | |
| else: | |
| raise RuntimeError('nope') | |
| return vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np | |
| def solution_is_found(agents: List[AgentAlg]): | |
| for agent in agents: | |
| if agent.path is None: | |
| return False | |
| if len(agent.path) == 0: | |
| return False | |
| if agent.path[-1] != agent.goal_node: | |
| return False | |
| return True | |
| def get_shuffled_agents(agents: List[AgentAlg]) -> List[AgentAlg]: | |
| agents_copy = agents[:] | |
| random.shuffle(agents_copy) | |
| unfinished: List[AgentAlg] = [a for a in agents_copy if len(a.path) == 0 or a.path[-1] != a.goal_node] | |
| finished: List[AgentAlg] = [a for a in agents_copy if len(a.path) > 0 and a.path[-1] == a.goal_node] | |
| return [*unfinished, *finished] | |