Spaces:
Runtime error
Runtime error
| from babel.numbers import is_currency | |
| from globals import * | |
| from functions import * | |
| from functions_plotting import * | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # CLASSES | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # FUNCS | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # -------------------------------------------------------------------------------------------------------------------- # | |
| # def get_sorted_nei_nodes( | |
| # agent: AgentAlg, | |
| # config_from: Dict[str, Node], | |
| # # nodes_dict: Dict[str, Node], | |
| # h_dict: Dict[str, np.ndarray], | |
| # ): | |
| # h_goal_np: np.ndarray = h_dict[agent.get_goal_node().xy_name] | |
| # # sort C in ascending order of dist(u, gi) where u ∈ C | |
| # # nei_nodes: List[Node] = [nodes_dict[n_name] for n_name in config_from[agent.name].neighbours] | |
| # nei_nodes: List[Node] = config_from[agent.name].neighbours_nodes[:] | |
| # random.shuffle(nei_nodes) | |
| # | |
| # def get_nei_v(n: Node) -> float: | |
| # return float(h_goal_np[n.x, n.y]) | |
| # | |
| # nei_nodes.sort(key=get_nei_v) | |
| # return nei_nodes | |
| def get_sorted_nei_nodes( | |
| agent: AgentAlg, | |
| curr_node: Node, | |
| h_dict: Dict[str, np.ndarray], | |
| ): | |
| h_goal_np: np.ndarray = h_dict[agent.get_goal_node().xy_name] | |
| # sort C in ascending order of dist(u, gi) where u ∈ C | |
| # nei_nodes: List[Node] = [nodes_dict[n_name] for n_name in curr_node.neighbours] | |
| nei_nodes: List[Node] = curr_node.neighbours_nodes[:] | |
| random.shuffle(nei_nodes) | |
| def get_nei_v(n: Node) -> float: | |
| return float(h_goal_np[n.x, n.y]) | |
| nei_nodes.sort(key=get_nei_v) | |
| return nei_nodes | |
| def there_is_vc( | |
| nei_node: Node, | |
| config_to: Dict[str, Node], | |
| ) -> bool: | |
| for name, n in config_to.items(): | |
| if nei_node == n: | |
| return True | |
| return False | |
| def get_agent_k( | |
| nei_node: Node, | |
| occupied_from: Dict[str, AgentAlg], | |
| config_to: Dict[str, Node], | |
| ) -> AgentAlg | None: | |
| if nei_node.xy_name in occupied_from: | |
| other_agent = occupied_from[nei_node.xy_name] | |
| if other_agent.name not in config_to: | |
| return other_agent | |
| return None | |
| # for a_f_name, n_f_node in config_from.items(): | |
| # if n_f_node == nei_node and a_f_name not in config_to: | |
| # return agents_dict[a_f_name] | |
| # return None | |
| def there_is_ec( | |
| agent_i: AgentAlg, | |
| node_to: Node, | |
| config_from: Dict[str, Node], | |
| config_to: Dict[str, Node], | |
| ) -> bool: | |
| node_from = config_from[agent_i.name] | |
| for other_name, other_node_from in config_from.items(): | |
| if other_name == agent_i.name or other_name not in config_to: | |
| continue | |
| other_node_to = config_to[other_name] | |
| if other_node_from == node_to and other_node_to == node_from: | |
| return True | |
| return False | |
| def get_next_node(node: Node, blocked: List[Node]) -> Node | None: | |
| nei_nodes = node.neighbours_nodes[:] | |
| nei_nodes.remove(node) | |
| for n in blocked: | |
| nei_nodes.remove(n) | |
| if len(nei_nodes) == 0: | |
| return None | |
| return random.choice(nei_nodes) | |
| # swap_is_required = check_if_swap_required(agent_k, agent_i, i_curr_node, first_node, h_dict) | |
| def check_if_swap_required( | |
| agent_i: AgentAlg, | |
| agent_j: AgentAlg, | |
| i_curr_node: Node, | |
| j_curr_node: Node, | |
| # config_from: Dict[str, Node], | |
| h_dict: Dict[str, np.ndarray], | |
| ) -> bool: | |
| """ | |
| This is done by continuously moving i to j’s location while moving j to another vertex not equal to i’s location, | |
| ignoring the other agents. | |
| The emulation stops in two cases: | |
| (i) The swap is not required when j’s location has a degree of more than two. | |
| (ii) The swap is required when | |
| (1) j’s location has a degree of one, | |
| or, | |
| (2) when i reaches gi while j’s nearest neighboring vertex toward its goal is gi. | |
| """ | |
| # i_curr_node = config_from[agent_i.name] | |
| # j_curr_node = config_from[agent_j.name] | |
| i_goal_node = agent_i.get_goal_node() | |
| if len(j_curr_node.neighbours) > 3: | |
| return False | |
| while True: | |
| next_node_i = j_curr_node | |
| next_node_j = get_next_node(j_curr_node, blocked=[i_curr_node]) | |
| if next_node_j is None: | |
| return True | |
| if len(next_node_j.neighbours) > 3: | |
| return False | |
| if next_node_i == i_goal_node: | |
| nei_nodes_j = get_sorted_nei_nodes(agent_j, next_node_j, h_dict) | |
| nearest_nei_to_goal_j = nei_nodes_j[0] | |
| return nearest_nei_to_goal_j == i_goal_node | |
| i_curr_node = next_node_i | |
| j_curr_node = next_node_j | |
| def check_if_swap_possible( | |
| # agent_i: AgentAlg, | |
| # agent_j: AgentAlg, | |
| i_curr_node: Node, | |
| j_curr_node: Node, | |
| # config_from: Dict[str, Node], | |
| ) -> bool: | |
| """ | |
| This is done by reversing the emulation direction; that is, | |
| continuously moving j to i’s location while moving i to another vertex. | |
| It stops in two cases: | |
| (i) The swap is possible when i’s location has a degree of more than two. | |
| (ii) The swap is impossible when i is on a vertex with degree of one. | |
| :return: | |
| """ | |
| # i_curr_node = config_from[agent_i.name] | |
| # j_curr_node = config_from[agent_j.name] | |
| while True: | |
| next_node_j = i_curr_node | |
| next_node_i = get_next_node(i_curr_node, blocked=[j_curr_node]) | |
| if next_node_i is None: | |
| return False | |
| if len(next_node_i.neighbours) > 3: | |
| return True | |
| i_curr_node = next_node_i | |
| j_curr_node = next_node_j | |
| def swap_required_and_possible( | |
| agent_i: AgentAlg, | |
| first_node: Node, | |
| config_from: Dict[str, Node], | |
| config_to: Dict[str, Node], | |
| occupied_from: Dict[str, AgentAlg], | |
| h_dict: Dict[str, np.ndarray], | |
| with_swap: bool, | |
| iteration: int = 0, | |
| ) -> AgentAlg | None: | |
| # first_node_name = first_node.xy_name | |
| if not with_swap: | |
| return None | |
| i_curr_node = config_from[agent_i.name] | |
| if i_curr_node == first_node: | |
| return None | |
| # for the a and b cases | |
| if first_node.xy_name in occupied_from: | |
| agent_j: AgentAlg = occupied_from[first_node.xy_name] | |
| assert agent_j != agent_i | |
| if agent_j.name in config_to: | |
| return None | |
| # necessity of the swap | |
| j_curr_node = config_from[agent_j.name] | |
| # is_required = check_if_swap_required(agent_i, agent_j, config_from, h_dict) | |
| swap_is_required = check_if_swap_required(agent_i, agent_j, i_curr_node, j_curr_node, h_dict) | |
| # possibility of the swap | |
| i_curr_node = config_from[agent_i.name] | |
| j_curr_node = config_from[agent_j.name] | |
| swap_is_possible = check_if_swap_possible(i_curr_node, j_curr_node) | |
| # is_possible = check_if_swap_possible(j_curr_node, i_curr_node) | |
| if swap_is_required and swap_is_possible: | |
| return agent_j | |
| # for the c case | |
| i_curr_node = config_from[agent_i.name] | |
| i_nei_nodes = i_curr_node.neighbours_nodes | |
| for i_nei_node in i_nei_nodes: | |
| if i_nei_node == i_curr_node or i_nei_node.xy_name not in occupied_from or first_node == i_nei_node: | |
| continue | |
| agent_k: AgentAlg = occupied_from[i_nei_node.xy_name] | |
| swap_is_required = check_if_swap_required(agent_k, agent_i, i_curr_node, first_node, h_dict) | |
| swap_is_possible = check_if_swap_possible(i_curr_node, first_node) | |
| # swap_is_possible = check_if_swap_possible(first_node, i_curr_node) | |
| if swap_is_required and swap_is_possible: | |
| return agent_k | |
| return None | |
| def run_procedure_pibt( | |
| agent_i: AgentAlg, | |
| config_from: Dict[str, Node], | |
| occupied_from: Dict[str, AgentAlg], | |
| config_to: Dict[str, Node], | |
| occupied_to: Dict[str, AgentAlg], | |
| agents_dict: Dict[str, AgentAlg], | |
| nodes_dict: Dict[str, Node], | |
| h_dict: Dict[str, np.ndarray], | |
| blocked_nodes_names: List[str], | |
| iteration: int = 0, | |
| with_message: str = '', | |
| with_swap: bool = True | |
| # with_swap: bool = False | |
| ) -> bool: # valid or invalid | |
| agent_i.message += f'| [{iteration}-{with_message}] pibt |' | |
| # nei_nodes = get_sorted_nei_nodes(main_agent, config_from, nodes_dict, h_dict) | |
| nei_nodes = get_sorted_nei_nodes(agent_i, config_from[agent_i.name], h_dict) | |
| # j ← swap_required_and_possible | |
| agent_j = swap_required_and_possible(agent_i, nei_nodes[0], config_from, config_to, occupied_from, h_dict, with_swap, iteration=iteration) | |
| if agent_j is not None: | |
| nei_nodes.reverse() | |
| for iter_nei_n, nei_node in enumerate(nei_nodes): | |
| if nei_node.xy_name in occupied_to: | |
| continue | |
| node_from = config_from[agent_i.name] | |
| if node_from.xy_name in occupied_to: | |
| other_agent = occupied_to[node_from.xy_name] | |
| if other_agent != agent_i and config_from[other_agent.name] == nei_node: | |
| continue | |
| if nei_node.xy_name in blocked_nodes_names: | |
| continue | |
| config_to[agent_i.name] = nei_node | |
| occupied_to[nei_node.xy_name] = agent_i | |
| agent_k = get_agent_k(nei_node, occupied_from, config_to) | |
| if agent_k is not None and agent_k != agent_i: | |
| valid = run_procedure_pibt( | |
| agent_k, | |
| config_from, occupied_from, | |
| config_to, occupied_to, | |
| agents_dict, nodes_dict, h_dict, blocked_nodes_names, iteration, with_message, with_swap | |
| ) | |
| if not valid: | |
| continue | |
| if with_swap and iter_nei_n == 0 and agent_j is not None and agent_j.name not in config_to: | |
| i_node_from = config_from[agent_i.name] | |
| if i_node_from.xy_name not in occupied_to: | |
| config_to[agent_j.name] = i_node_from | |
| occupied_to[i_node_from.xy_name] = agent_j | |
| return True | |
| node_from = config_from[agent_i.name] | |
| config_to[agent_i.name] = node_from | |
| occupied_to[node_from.xy_name] = agent_i | |
| return False | |