MAPF_Solver / algs /alg_functions_pibt.py
ArseniyPerchik's picture
more
ad7641a
from babel.numbers import is_currency
from globals import *
from functions import *
from functions_plotting import *
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# CLASSES
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# FUNCS
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# def get_sorted_nei_nodes(
# agent: AgentAlg,
# config_from: Dict[str, Node],
# # nodes_dict: Dict[str, Node],
# h_dict: Dict[str, np.ndarray],
# ):
# h_goal_np: np.ndarray = h_dict[agent.get_goal_node().xy_name]
# # sort C in ascending order of dist(u, gi) where u ∈ C
# # nei_nodes: List[Node] = [nodes_dict[n_name] for n_name in config_from[agent.name].neighbours]
# nei_nodes: List[Node] = config_from[agent.name].neighbours_nodes[:]
# random.shuffle(nei_nodes)
#
# def get_nei_v(n: Node) -> float:
# return float(h_goal_np[n.x, n.y])
#
# nei_nodes.sort(key=get_nei_v)
# return nei_nodes
def get_sorted_nei_nodes(
agent: AgentAlg,
curr_node: Node,
h_dict: Dict[str, np.ndarray],
):
h_goal_np: np.ndarray = h_dict[agent.get_goal_node().xy_name]
# sort C in ascending order of dist(u, gi) where u ∈ C
# nei_nodes: List[Node] = [nodes_dict[n_name] for n_name in curr_node.neighbours]
nei_nodes: List[Node] = curr_node.neighbours_nodes[:]
random.shuffle(nei_nodes)
def get_nei_v(n: Node) -> float:
return float(h_goal_np[n.x, n.y])
nei_nodes.sort(key=get_nei_v)
return nei_nodes
def there_is_vc(
nei_node: Node,
config_to: Dict[str, Node],
) -> bool:
for name, n in config_to.items():
if nei_node == n:
return True
return False
def get_agent_k(
nei_node: Node,
occupied_from: Dict[str, AgentAlg],
config_to: Dict[str, Node],
) -> AgentAlg | None:
if nei_node.xy_name in occupied_from:
other_agent = occupied_from[nei_node.xy_name]
if other_agent.name not in config_to:
return other_agent
return None
# for a_f_name, n_f_node in config_from.items():
# if n_f_node == nei_node and a_f_name not in config_to:
# return agents_dict[a_f_name]
# return None
def there_is_ec(
agent_i: AgentAlg,
node_to: Node,
config_from: Dict[str, Node],
config_to: Dict[str, Node],
) -> bool:
node_from = config_from[agent_i.name]
for other_name, other_node_from in config_from.items():
if other_name == agent_i.name or other_name not in config_to:
continue
other_node_to = config_to[other_name]
if other_node_from == node_to and other_node_to == node_from:
return True
return False
def get_next_node(node: Node, blocked: List[Node]) -> Node | None:
nei_nodes = node.neighbours_nodes[:]
nei_nodes.remove(node)
for n in blocked:
nei_nodes.remove(n)
if len(nei_nodes) == 0:
return None
return random.choice(nei_nodes)
# swap_is_required = check_if_swap_required(agent_k, agent_i, i_curr_node, first_node, h_dict)
def check_if_swap_required(
agent_i: AgentAlg,
agent_j: AgentAlg,
i_curr_node: Node,
j_curr_node: Node,
# config_from: Dict[str, Node],
h_dict: Dict[str, np.ndarray],
) -> bool:
"""
This is done by continuously moving i to j’s location while moving j to another vertex not equal to i’s location,
ignoring the other agents.
The emulation stops in two cases:
(i) The swap is not required when j’s location has a degree of more than two.
(ii) The swap is required when
(1) j’s location has a degree of one,
or,
(2) when i reaches gi while j’s nearest neighboring vertex toward its goal is gi.
"""
# i_curr_node = config_from[agent_i.name]
# j_curr_node = config_from[agent_j.name]
i_goal_node = agent_i.get_goal_node()
if len(j_curr_node.neighbours) > 3:
return False
while True:
next_node_i = j_curr_node
next_node_j = get_next_node(j_curr_node, blocked=[i_curr_node])
if next_node_j is None:
return True
if len(next_node_j.neighbours) > 3:
return False
if next_node_i == i_goal_node:
nei_nodes_j = get_sorted_nei_nodes(agent_j, next_node_j, h_dict)
nearest_nei_to_goal_j = nei_nodes_j[0]
return nearest_nei_to_goal_j == i_goal_node
i_curr_node = next_node_i
j_curr_node = next_node_j
def check_if_swap_possible(
# agent_i: AgentAlg,
# agent_j: AgentAlg,
i_curr_node: Node,
j_curr_node: Node,
# config_from: Dict[str, Node],
) -> bool:
"""
This is done by reversing the emulation direction; that is,
continuously moving j to i’s location while moving i to another vertex.
It stops in two cases:
(i) The swap is possible when i’s location has a degree of more than two.
(ii) The swap is impossible when i is on a vertex with degree of one.
:return:
"""
# i_curr_node = config_from[agent_i.name]
# j_curr_node = config_from[agent_j.name]
while True:
next_node_j = i_curr_node
next_node_i = get_next_node(i_curr_node, blocked=[j_curr_node])
if next_node_i is None:
return False
if len(next_node_i.neighbours) > 3:
return True
i_curr_node = next_node_i
j_curr_node = next_node_j
def swap_required_and_possible(
agent_i: AgentAlg,
first_node: Node,
config_from: Dict[str, Node],
config_to: Dict[str, Node],
occupied_from: Dict[str, AgentAlg],
h_dict: Dict[str, np.ndarray],
with_swap: bool,
iteration: int = 0,
) -> AgentAlg | None:
# first_node_name = first_node.xy_name
if not with_swap:
return None
i_curr_node = config_from[agent_i.name]
if i_curr_node == first_node:
return None
# for the a and b cases
if first_node.xy_name in occupied_from:
agent_j: AgentAlg = occupied_from[first_node.xy_name]
assert agent_j != agent_i
if agent_j.name in config_to:
return None
# necessity of the swap
j_curr_node = config_from[agent_j.name]
# is_required = check_if_swap_required(agent_i, agent_j, config_from, h_dict)
swap_is_required = check_if_swap_required(agent_i, agent_j, i_curr_node, j_curr_node, h_dict)
# possibility of the swap
i_curr_node = config_from[agent_i.name]
j_curr_node = config_from[agent_j.name]
swap_is_possible = check_if_swap_possible(i_curr_node, j_curr_node)
# is_possible = check_if_swap_possible(j_curr_node, i_curr_node)
if swap_is_required and swap_is_possible:
return agent_j
# for the c case
i_curr_node = config_from[agent_i.name]
i_nei_nodes = i_curr_node.neighbours_nodes
for i_nei_node in i_nei_nodes:
if i_nei_node == i_curr_node or i_nei_node.xy_name not in occupied_from or first_node == i_nei_node:
continue
agent_k: AgentAlg = occupied_from[i_nei_node.xy_name]
swap_is_required = check_if_swap_required(agent_k, agent_i, i_curr_node, first_node, h_dict)
swap_is_possible = check_if_swap_possible(i_curr_node, first_node)
# swap_is_possible = check_if_swap_possible(first_node, i_curr_node)
if swap_is_required and swap_is_possible:
return agent_k
return None
def run_procedure_pibt(
agent_i: AgentAlg,
config_from: Dict[str, Node],
occupied_from: Dict[str, AgentAlg],
config_to: Dict[str, Node],
occupied_to: Dict[str, AgentAlg],
agents_dict: Dict[str, AgentAlg],
nodes_dict: Dict[str, Node],
h_dict: Dict[str, np.ndarray],
blocked_nodes_names: List[str],
iteration: int = 0,
with_message: str = '',
with_swap: bool = True
# with_swap: bool = False
) -> bool: # valid or invalid
agent_i.message += f'| [{iteration}-{with_message}] pibt |'
# nei_nodes = get_sorted_nei_nodes(main_agent, config_from, nodes_dict, h_dict)
nei_nodes = get_sorted_nei_nodes(agent_i, config_from[agent_i.name], h_dict)
# j ← swap_required_and_possible
agent_j = swap_required_and_possible(agent_i, nei_nodes[0], config_from, config_to, occupied_from, h_dict, with_swap, iteration=iteration)
if agent_j is not None:
nei_nodes.reverse()
for iter_nei_n, nei_node in enumerate(nei_nodes):
if nei_node.xy_name in occupied_to:
continue
node_from = config_from[agent_i.name]
if node_from.xy_name in occupied_to:
other_agent = occupied_to[node_from.xy_name]
if other_agent != agent_i and config_from[other_agent.name] == nei_node:
continue
if nei_node.xy_name in blocked_nodes_names:
continue
config_to[agent_i.name] = nei_node
occupied_to[nei_node.xy_name] = agent_i
agent_k = get_agent_k(nei_node, occupied_from, config_to)
if agent_k is not None and agent_k != agent_i:
valid = run_procedure_pibt(
agent_k,
config_from, occupied_from,
config_to, occupied_to,
agents_dict, nodes_dict, h_dict, blocked_nodes_names, iteration, with_message, with_swap
)
if not valid:
continue
if with_swap and iter_nei_n == 0 and agent_j is not None and agent_j.name not in config_to:
i_node_from = config_from[agent_i.name]
if i_node_from.xy_name not in occupied_to:
config_to[agent_j.name] = i_node_from
occupied_to[i_node_from.xy_name] = agent_j
return True
node_from = config_from[agent_i.name]
config_to[agent_i.name] = node_from
occupied_to[node_from.xy_name] = agent_i
return False