MAPF_Solver / algs /alg_sipps.py
ArseniyPerchik's picture
more
ad7641a
from algs.alg_sipps_functions import *
def run_sipps_insert_node(
node: SIPPSNode,
Q: List[SIPPSNode],
P: List[SIPPSNode],
ident_dict: DefaultDict[str, List[SIPPSNode]],
goal_node: Node,
goal_np: np.ndarray,
T: int,
T_tag: int,
ec_soft_np: np.ndarray, # x, y, x, y, t -> bool (0/1)
si_table: Dict[str, List[Tuple[int, int, str]]],
agent=None,
) -> None:
compute_c_g_h_f_values(node, goal_node, goal_np, T, T_tag, ec_soft_np, si_table)
identical_nodes = get_identical_nodes(node, Q, P, ident_dict)
for q in identical_nodes:
if q.low <= node.low and q.c <= node.c:
return
elif node.low <= q.low and node.c <= q.c:
if q in Q:
Q.remove(q)
ident_dict[q.ident_str].remove(q)
if q in P:
P.remove(q)
ident_dict[q.ident_str].remove(q)
elif node.low < q.high and q.low < node.high:
if node.low < q.low:
node.set_high(q.low)
else:
q.set_high(node.low)
heapq.heappush(Q, node)
ident_dict[node.ident_str].append(node)
return
def run_sipps_expand_node(
node: SIPPSNode,
nodes_dict: Dict[str, Node],
Q: List[SIPPSNode],
P: List[SIPPSNode],
ident_dict: DefaultDict[str, List[SIPPSNode]],
si_table: Dict[str, List[Tuple[int, int, str]]],
goal_node: Node,
goal_np: np.ndarray,
T: int,
T_tag: int,
ec_hard_np: np.ndarray, # x, y, x, y, t -> bool (0/1)
ec_soft_np: np.ndarray, # x, y, x, y, t -> bool (0/1)
agent=None
):
I_group: List[Tuple[Node, int]] = get_I_group(node, nodes_dict, si_table, agent)
# I_group_names = [(v.xy_name, i, si_table[v.xy_name][i]) for v, i in I_group]
for v_node, si_id in I_group:
init_low, init_high, init_type = si_table[v_node.xy_name][si_id]
new_low = get_low_without_hard_ec(node, node.n, v_node, init_low, init_high, ec_hard_np, agent)
if new_low is None:
continue
new_low_tag = get_low_without_hard_and_soft_ec(node, node.n, v_node, new_low, init_high, ec_hard_np, ec_soft_np)
if new_low_tag is not None and new_low < new_low_tag < init_high:
n_1 = SIPPSNode(v_node, (new_low, new_low_tag, init_type), si_id, False, parent=node)
run_sipps_insert_node(n_1, Q, P, ident_dict, goal_node, goal_np, T, T_tag, ec_soft_np, si_table)
n_2 = SIPPSNode(v_node, (new_low_tag, init_high, init_type), si_id, False, parent=node)
run_sipps_insert_node(n_2, Q, P, ident_dict, goal_node, goal_np, T, T_tag, ec_soft_np, si_table)
else:
n_3 = SIPPSNode(v_node, (new_low, init_high, init_type), si_id, False, parent=node)
run_sipps_insert_node(n_3, Q, P, ident_dict, goal_node, goal_np, T, T_tag, ec_soft_np, si_table)
def run_sipps(
start_node: Node,
goal_node: Node,
nodes: List[Node],
nodes_dict: Dict[str, Node],
h_dict: Dict[str, np.ndarray],
vc_hard_np: np.ndarray | None, # x, y, t -> bool (0/1)
ec_hard_np: np.ndarray | None, # x, y, x, y, t -> bool (0/1)
pc_hard_np: np.ndarray | None, # x, y -> time (int)
vc_soft_np: np.ndarray | None, # x, y, t -> bool (0/1)
ec_soft_np: np.ndarray | None, # x, y, x, y, t -> bool (0/1)
pc_soft_np: np.ndarray | None, # x, y -> time (int)
inf_num: int = int(1e10),
max_final_time: int = int(1e10),
flag_k_limit: bool = False,
k_limit: int = int(1e10),
agent=None,
si_table: Dict[str, List[Tuple[int, int, str]]] | None = None,
**kwargs,
) -> Tuple[List[Node] | None, dict]:
T = get_T(goal_node, si_table)
if T >= inf_num:
return None, {}
root = SIPPSNode(start_node, si_table[start_node.xy_name][0], 0, False)
# goal_vc_times_list = get_vc_list(goal_node, vc_hard_np)
# if len(goal_vc_times_list) > 0:
# T = max(goal_vc_times_list) + 1
T_tag = get_T_tag(goal_node, si_table)
goal_np: np.ndarray = h_dict[goal_node.xy_name]
compute_c_g_h_f_values(root, goal_node, goal_np, T, T_tag, ec_soft_np, si_table)
Q: List[SIPPSNode] = []
P: List[SIPPSNode] = []
ident_dict: DefaultDict[str, List[SIPPSNode]] = defaultdict(lambda: [])
heapq.heappush(Q, root)
ident_dict[root.ident_str].append(root)
while len(Q) > 0:
# print(f'\r{len(Q)=}, {len(P)=}', end='')
next_n: SIPPSNode = heapq.heappop(Q)
if next_n.is_goal or next_n.low >= k_limit:
nodes_path, sipps_path = extract_path(next_n, agent=agent)
sipps_path_names = [n.to_print() for n in sipps_path]
return nodes_path, {
'T': T, 'T_tag': T_tag, 'Q': Q, 'P': P, 'si_table': si_table, 'r_type': 'is_goal',
'sipps_path': sipps_path, 'sipps_path_names': sipps_path_names, 'c': next_n.c,
}
if next_n.n == goal_node and next_n.low >= T:
c_future = get_c_future(goal_node, next_n.low, si_table)
if c_future == 0:
nodes_path, sipps_path = extract_path(next_n, agent=agent)
# nodes_path_names = [n.xy_name for n in nodes_path]
sipps_path_names = [n.to_print() for n in sipps_path]
return nodes_path, {
'T': T, 'T_tag': T_tag, 'Q': Q, 'P': P, 'si_table': si_table, 'r_type': 'c_future=0',
'sipps_path': sipps_path, 'sipps_path_names': sipps_path_names, 'c': next_n.c,
}
n_tag = duplicate_sipps_node(next_n)
n_tag.is_goal = True
n_tag.c += c_future
run_sipps_insert_node(n_tag, Q, P, ident_dict, goal_node, goal_np, T, T_tag, ec_soft_np, si_table)
run_sipps_expand_node(next_n, nodes_dict, Q, P, ident_dict, si_table, goal_node, goal_np, T, T_tag,
ec_hard_np, ec_soft_np, agent)
heapq.heappush(P, next_n)
ident_dict[next_n.ident_str].append(next_n)
return None, {
'T': T, 'T_tag': T_tag, 'Q': Q, 'P': P, 'si_table': si_table,
}
@use_profiler(save_dir='../stats/alg_sipps.pstat')
def main():
# set_seed(random_seed_bool=False, seed=7310)
# set_seed(random_seed_bool=False, seed=123)
set_seed(random_seed_bool=True)
# img_dir = '10_10_my_rand.map'
img_dir = 'empty-32-32.map'
# img_dir = 'random-32-32-10.map'
# img_dir = 'random-32-32-20.map'
# img_dir = 'room-32-32-4.map'
# img_dir = 'maze-32-32-2.map'
# img_dir = 'maze-32-32-4.map'
to_render: bool = True
# final_render: bool = False
path_to_maps: str = '../maps'
path_to_heuristics: str = '../logs_for_heuristics'
img_np, (height, width) = get_np_from_dot_map(img_dir, path_to_maps)
map_dim = (height, width)
nodes, nodes_dict = build_graph_from_np(img_np, show_map=False)
h_dict = exctract_h_dict(img_dir, path_to_heuristics)
path1 = [
nodes_dict['7_1'],
nodes_dict['7_1'],
nodes_dict['6_1'],
nodes_dict['5_1'],
nodes_dict['4_1'],
]
path2 = [
nodes_dict['6_0'],
nodes_dict['6_0'],
# nodes_dict['6_0'],
# nodes_dict['6_0'],
# nodes_dict['6_1'],
# nodes_dict['6_1'],
# nodes_dict['6_1'],
# nodes_dict['6_0'],
]
h_paths = [path1, path2]
max_path_len = max(map(lambda x: len(x), h_paths))
vc_hard_np, ec_hard_np, pc_hard_np = create_constraints(h_paths, map_dim, max_path_len)
path = [
nodes_dict['7_2'],
nodes_dict['7_2'],
nodes_dict['6_2'],
nodes_dict['5_2'],
nodes_dict['4_2'],
]
s_paths = [path]
vc_soft_np, ec_soft_np, pc_soft_np = create_constraints(s_paths, map_dim, max_path_len)
start_node = nodes_dict['6_2']
goal_node = nodes_dict['25_25']
result, info = run_sipps(
start_node, goal_node, nodes, nodes_dict, h_dict,
vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np
)
# plot
if to_render:
print(f'{[n.xy_name for n in result]}')
plot_np = np.ones(img_np.shape) * -2
# nodes
for n in nodes:
plot_np[n.x, n.y] = 0
# hard
for h_path in h_paths:
for n in h_path:
plot_np[n.x, n.y] = -1
# soft
for s_path in s_paths:
for n in s_path:
plot_np[n.x, n.y] = -0.5
# result
for n in result:
plot_np[n.x, n.y] = 1
plt.imshow(plot_np, cmap='binary', origin='lower')
plt.show()
if __name__ == '__main__':
main()
# if vc_hard_np is not None and vc_soft_np is not None:
# assert vc_hard_np.shape[2] == vc_soft_np.shape[2]
# assert ec_hard_np.shape[4] == ec_soft_np.shape[4]
# assert int(max(np.max(pc_hard_np), np.max(pc_soft_np))) + 1 == vc_hard_np.shape[2]
# assert int(max(np.max(pc_hard_np), np.max(pc_soft_np))) + 1 == ec_hard_np.shape[4]
# if pc_hard_np is not None:
# assert pc_hard_np[goal_node.x, goal_node.y] == -1