MAPF_Solver / algs /alg_functions_lacam.py
ArseniyPerchik's picture
more
ad7641a
from globals import *
from functions import *
from functions_plotting import *
from algs.alg_mapf_pibt import run_procedure_pibt
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# CLASSES
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
class LowLevelNode:
def __init__(self,
parent: Self | None,
who: AgentAlg | None,
where: Node | None,
depth: int):
self.parent: Self | None = parent
self.who: AgentAlg | None = who
self.where: Node | None = where
self.depth = depth
self.who_list: List[AgentAlg] = []
self.where_list: List[Node] = []
def __str__(self):
return f'~ depth={self.depth}, who_list={self.who_list}, where_list={self.where_list} ~'
def __repr__(self):
return f'~ depth={self.depth}, who_list={self.who_list}, where_list={self.where_list} ~'
class HighLevelNode:
def __init__(
self,
config: Dict[str, Node],
tree: Deque[LowLevelNode],
order: List[AgentAlg],
parent: Self | None,
finished: int = 0,
i: int = -1,
):
self.config: Dict[str, Node] = config
self.tree: Deque[LowLevelNode] = tree
self.order: List[AgentAlg] = order
self.parent: Self | None = parent
self.finished: int = finished
self.i: int = i
self.name = get_config_name(self.config)
def __eq__(self, other: Self):
return self.name == other.name
def __str__(self):
return f'i={self.i}, {self.name}'
def __repr__(self):
return f'i={self.i}, {self.name}'
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# HELP FUNCS
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
def get_C_init():
return LowLevelNode(None, None, None, 0)
def get_C_child(
parent: LowLevelNode,
who: AgentAlg,
where: Node
) -> LowLevelNode:
C_new = LowLevelNode(parent=parent, who=who, where=where, depth=parent.depth + 1)
C_new.who_list = parent.who_list + [who]
C_new.where_list = parent.where_list + [where]
return C_new
# def get_h_value(a: AgentAlg) -> float:
# curr_node = config[a.name]
# h_map = h_dict[a.goal_node.xy_name]
# res: float = float(h_map[curr_node.x, curr_node.y])
# return res
def get_init_order(agents: List[AgentAlg]) -> List[AgentAlg]:
out_list: List[AgentAlg] = agents[:]
out_list.sort(key=lambda a: a.priority, reverse=True)
return out_list
def get_order(
config_new: Dict[str, Node],
N: HighLevelNode,
) -> Tuple[List[AgentAlg], int]:
out_list: List[AgentAlg] = N.order[:]
finished = 0
for i in out_list:
if config_new[i.name] == i.goal_node:
i.priority = i.init_priority
finished += 1
else:
i.priority += 1
out_list.sort(key=lambda a: a.priority, reverse=True)
return out_list, finished
def get_config_name(config: Dict[str, Node]):
assert len(config) > 0
k_list = list(config.keys())
k_list.sort()
name = ''
for k in k_list:
v = config[k]
name += v.xy_name + '-'
return name[:-1]
def backtrack(N: HighLevelNode) -> Dict[str, List[Node]]:
paths_deque_dict: Dict[str, Deque[Node]] = {k: deque([v]) for k, v in N.config.items()}
parent: HighLevelNode = N.parent
while parent is not None:
for k, v in parent.config.items():
paths_deque_dict[k].appendleft(v)
parent = parent.parent
paths_dict: Dict[str, List[Node]] = {}
for k, v in paths_deque_dict.items():
paths_dict[k] = list(v)
return paths_dict
def get_new_config(
N: HighLevelNode,
C: LowLevelNode,
agents_dict: Dict[str, AgentAlg],
nodes_dict: Dict[str, Node],
h_dict: Dict[str, np.ndarray],
) -> Dict[str, Node] | None:
# setup next configuration
config_from: Dict[str, Node] = N.config
occupied_from: Dict[str, AgentAlg] = {v.xy_name: agents_dict[k] for k, v in N.config.items()}
config_to: Dict[str, Node] = {}
occupied_to: Dict[str, AgentAlg] = {}
for k in range(C.depth):
agent = C.who_list[k]
node = C.where_list[k]
config_to[agent.name] = node
# vc
if node.xy_name in occupied_to:
return None
occupied_to[node.xy_name] = agent
# ec
if node.xy_name in occupied_from:
other_agent = occupied_from[node.xy_name]
if other_agent != agent and other_agent.name in config_to and config_to[other_agent.name] == config_from[agent.name]:
return None
# apply PIBT
for agent in N.order:
if agent.name not in config_to:
success = run_procedure_pibt(
agent,
config_from, occupied_from,
config_to, occupied_to,
agents_dict, nodes_dict, h_dict, [], with_swap=False)
if not success:
return None
return config_to