Spaces:
Runtime error
Runtime error
| from globals import * | |
| from functions import * | |
| from functions_plotting import * | |
| from algs.alg_mapf_pibt import run_procedure_pibt | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # CLASSES | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| class LowLevelNode: | |
| def __init__(self, | |
| parent: Self | None, | |
| who: AgentAlg | None, | |
| where: Node | None, | |
| depth: int): | |
| self.parent: Self | None = parent | |
| self.who: AgentAlg | None = who | |
| self.where: Node | None = where | |
| self.depth = depth | |
| self.who_list: List[AgentAlg] = [] | |
| self.where_list: List[Node] = [] | |
| def __str__(self): | |
| return f'~ depth={self.depth}, who_list={self.who_list}, where_list={self.where_list} ~' | |
| def __repr__(self): | |
| return f'~ depth={self.depth}, who_list={self.who_list}, where_list={self.where_list} ~' | |
| class HighLevelNode: | |
| def __init__( | |
| self, | |
| config: Dict[str, Node], | |
| tree: Deque[LowLevelNode], | |
| order: List[AgentAlg], | |
| parent: Self | None, | |
| finished: int = 0, | |
| i: int = -1, | |
| ): | |
| self.config: Dict[str, Node] = config | |
| self.tree: Deque[LowLevelNode] = tree | |
| self.order: List[AgentAlg] = order | |
| self.parent: Self | None = parent | |
| self.finished: int = finished | |
| self.i: int = i | |
| self.name = get_config_name(self.config) | |
| def __eq__(self, other: Self): | |
| return self.name == other.name | |
| def __str__(self): | |
| return f'i={self.i}, {self.name}' | |
| def __repr__(self): | |
| return f'i={self.i}, {self.name}' | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # HELP FUNCS | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| def get_C_init(): | |
| return LowLevelNode(None, None, None, 0) | |
| def get_C_child( | |
| parent: LowLevelNode, | |
| who: AgentAlg, | |
| where: Node | |
| ) -> LowLevelNode: | |
| C_new = LowLevelNode(parent=parent, who=who, where=where, depth=parent.depth + 1) | |
| C_new.who_list = parent.who_list + [who] | |
| C_new.where_list = parent.where_list + [where] | |
| return C_new | |
| # def get_h_value(a: AgentAlg) -> float: | |
| # curr_node = config[a.name] | |
| # h_map = h_dict[a.goal_node.xy_name] | |
| # res: float = float(h_map[curr_node.x, curr_node.y]) | |
| # return res | |
| def get_init_order(agents: List[AgentAlg]) -> List[AgentAlg]: | |
| out_list: List[AgentAlg] = agents[:] | |
| out_list.sort(key=lambda a: a.priority, reverse=True) | |
| return out_list | |
| def get_order( | |
| config_new: Dict[str, Node], | |
| N: HighLevelNode, | |
| ) -> Tuple[List[AgentAlg], int]: | |
| out_list: List[AgentAlg] = N.order[:] | |
| finished = 0 | |
| for i in out_list: | |
| if config_new[i.name] == i.goal_node: | |
| i.priority = i.init_priority | |
| finished += 1 | |
| else: | |
| i.priority += 1 | |
| out_list.sort(key=lambda a: a.priority, reverse=True) | |
| return out_list, finished | |
| def get_config_name(config: Dict[str, Node]): | |
| assert len(config) > 0 | |
| k_list = list(config.keys()) | |
| k_list.sort() | |
| name = '' | |
| for k in k_list: | |
| v = config[k] | |
| name += v.xy_name + '-' | |
| return name[:-1] | |
| def backtrack(N: HighLevelNode) -> Dict[str, List[Node]]: | |
| paths_deque_dict: Dict[str, Deque[Node]] = {k: deque([v]) for k, v in N.config.items()} | |
| parent: HighLevelNode = N.parent | |
| while parent is not None: | |
| for k, v in parent.config.items(): | |
| paths_deque_dict[k].appendleft(v) | |
| parent = parent.parent | |
| paths_dict: Dict[str, List[Node]] = {} | |
| for k, v in paths_deque_dict.items(): | |
| paths_dict[k] = list(v) | |
| return paths_dict | |
| def get_new_config( | |
| N: HighLevelNode, | |
| C: LowLevelNode, | |
| agents_dict: Dict[str, AgentAlg], | |
| nodes_dict: Dict[str, Node], | |
| h_dict: Dict[str, np.ndarray], | |
| ) -> Dict[str, Node] | None: | |
| # setup next configuration | |
| config_from: Dict[str, Node] = N.config | |
| occupied_from: Dict[str, AgentAlg] = {v.xy_name: agents_dict[k] for k, v in N.config.items()} | |
| config_to: Dict[str, Node] = {} | |
| occupied_to: Dict[str, AgentAlg] = {} | |
| for k in range(C.depth): | |
| agent = C.who_list[k] | |
| node = C.where_list[k] | |
| config_to[agent.name] = node | |
| # vc | |
| if node.xy_name in occupied_to: | |
| return None | |
| occupied_to[node.xy_name] = agent | |
| # ec | |
| if node.xy_name in occupied_from: | |
| other_agent = occupied_from[node.xy_name] | |
| if other_agent != agent and other_agent.name in config_to and config_to[other_agent.name] == config_from[agent.name]: | |
| return None | |
| # apply PIBT | |
| for agent in N.order: | |
| if agent.name not in config_to: | |
| success = run_procedure_pibt( | |
| agent, | |
| config_from, occupied_from, | |
| config_to, occupied_to, | |
| agents_dict, nodes_dict, h_dict, [], with_swap=False) | |
| if not success: | |
| return None | |
| return config_to | |