MAPF_Solver / algs /alg_functions_LNS2.py
ArseniyPerchik's picture
more
ad7641a
import random
from globals import *
from functions import *
from functions_plotting import *
from algs.alg_sipps import run_sipps
from algs.alg_sipps_functions import init_si_table, update_si_table_soft
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# CLASSES
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# class AgentLNS2:
# def __init__(self, num: int, start_node: Node, goal_node: Node):
# self.num = num
# self.name = f'agent_{num}'
# self.start_node: Node = start_node
# self.start_node_name: str = self.start_node.xy_name
# self.curr_node: Node = start_node
# self.curr_node_name: str = self.curr_node.xy_name
# self.goal_node: Node = goal_node
# self.goal_node_name: str = self.goal_node.xy_name
# self.path: List[Node] | None = []
# self.k_path: List[Node] | None = None
#
# @property
# def path_names(self):
# return [n.xy_name for n in self.path]
#
# def update_curr_node(self, i_time):
# if i_time >= len(self.path):
# self.curr_node = self.path[-1]
# return
# self.curr_node = self.path[i_time]
#
# def __lt__(self, other):
# return self.num < other.num
#
# def __hash__(self):
# return hash(self.num)
#
# def __eq__(self, other):
# return self.num == other.num
#
# def __str__(self):
# return self.name
#
# def __repr__(self):
# return self.name
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# FUNCS
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
def create_lns_agents(
start_nodes: List[Node], goal_nodes: List[Node]
) -> Tuple[List[AgentAlg], Dict[str, AgentAlg]]:
agents: List[AgentAlg] = []
agents_dict: Dict[str, AgentAlg] = {}
for num, (s_node, g_node) in enumerate(zip(start_nodes, goal_nodes)):
new_agent = AgentAlg(num, s_node, g_node)
agents.append(new_agent)
agents_dict[new_agent.name] = new_agent
return agents, agents_dict
def solution_is_found(agents: List[AgentAlg]):
for agent in agents:
if agent.path is None:
return False
if len(agent.path) == 0:
return False
if agent.path[-1] != agent.goal_node:
return False
return True
def get_shuffled_agents(agents: List[AgentAlg]) -> List[AgentAlg]:
agents_copy = agents[:]
random.shuffle(agents_copy)
unfinished: List[AgentAlg] = [a for a in agents_copy if len(a.path) == 0 or a.path[-1] != a.goal_node]
finished: List[AgentAlg] = [a for a in agents_copy if len(a.path) > 0 and a.path[-1] == a.goal_node]
return [*unfinished, *finished]
def create_init_solution(
agents: List[AgentAlg],
nodes: List[Node],
nodes_dict: Dict[str, Node],
h_dict: Dict[str, np.ndarray],
map_dim: Tuple[int, int],
constr_type: str,
start_time: int | float,
params: dict
):
alg_name: str = params['alg_name']
c_sum: int = 0
h_priority_agents: List[AgentAlg] = []
longest_len = 1
# vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, longest_len)
# vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, longest_len)
ec_hard_np = init_ec_table(map_dim, longest_len)
ec_soft_np = init_ec_table(map_dim, longest_len)
si_table: Dict[str, List[Tuple[int, int, str]]] = init_si_table(nodes)
for agent in agents:
new_path, sipps_info = run_sipps(
agent.start_node, agent.goal_node, nodes, nodes_dict, h_dict,
None, ec_hard_np, None, None, ec_soft_np, None,
agent=agent, si_table=si_table
)
if new_path is None:
agent.path = None
break
agent.path = new_path[:]
h_priority_agents.append(agent)
align_all_paths(h_priority_agents)
c_sum += sipps_info['c']
si_table = update_si_table_soft(new_path, si_table)
if longest_len < len(new_path):
longest_len = len(new_path)
# vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, longest_len)
# vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, longest_len)
ec_hard_np = init_ec_table(map_dim, longest_len)
ec_soft_np = init_ec_table(map_dim, longest_len)
for h_agent in h_priority_agents:
# update_constraints(h_agent.path, vc_soft_np, ec_soft_np, pc_soft_np)
update_ec_table(h_agent.path, ec_soft_np)
else:
# update_constraints(new_path, vc_soft_np, ec_soft_np, pc_soft_np)
update_ec_table(new_path, ec_soft_np)
# checks
runtime = time.time() - start_time
print(f'\r[{alg_name} - init] | agents: {len(h_priority_agents): <3} / {len(agents)} | {runtime= : .2f} s.',
end='') # , end=''
def solve_subset_with_prp(
agents_subset: List[AgentAlg],
outer_agents: List[AgentAlg],
nodes: List[Node],
nodes_dict: Dict[str, Node],
h_dict: Dict[str, np.ndarray],
map_dim: Tuple[int, int],
start_time: int | float,
# constr_type: str = 'hard',
constr_type: str = 'soft',
agents: List[AgentAlg] | None = None
) -> None:
c_sum: int = 0
h_priority_agents: List[AgentAlg] = outer_agents[:]
si_table: Dict[str, List[Tuple[int, int, str]]] = init_si_table(nodes)
longest_len = max([len(a.path) for a in agents])
# vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, longest_len)
# vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, longest_len)
ec_hard_np = init_ec_table(map_dim, longest_len)
ec_soft_np = init_ec_table(map_dim, longest_len)
for h_agent in h_priority_agents:
# update_constraints(h_agent.path, vc_soft_np, ec_soft_np, pc_soft_np)
update_ec_table(h_agent.path, ec_soft_np)
si_table = update_si_table_soft(h_agent.path, si_table)
random.shuffle(agents_subset)
for agent in agents_subset:
new_path, sipps_info = run_sipps(
agent.start_node, agent.goal_node, nodes, nodes_dict, h_dict,
None, ec_hard_np, None, None, ec_soft_np, None,
agent=agent, si_table=si_table
)
if new_path is None:
agent.path = None
break
agent.path = new_path[:]
h_priority_agents.append(agent)
align_all_paths(h_priority_agents)
c_sum += sipps_info['c']
si_table = update_si_table_soft(new_path, si_table)
if longest_len < len(new_path):
longest_len = len(new_path)
# vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, longest_len)
# vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, longest_len)
ec_hard_np = init_ec_table(map_dim, longest_len)
ec_soft_np = init_ec_table(map_dim, longest_len)
for h_agent in h_priority_agents:
# update_constraints(h_agent.path, vc_soft_np, ec_soft_np, pc_soft_np)
update_ec_table(h_agent.path, ec_soft_np)
else:
# update_constraints(new_path, vc_soft_np, ec_soft_np, pc_soft_np)
update_ec_table(new_path, ec_soft_np)
# checks
runtime = time.time() - start_time
assert len(agents_subset) + len(outer_agents) == len(agents)
print(
f'\r[nei calc] | agents: {len(h_priority_agents): <3} / {len(agents_subset) + len(outer_agents)} | {runtime= : .2f} s.',
end='') # , end=''
# collisions: int = 0
# align_all_paths(h_priority_agents)
# for i in range(len(h_priority_agents[0].path)):
# to_count = False if constr_type == 'hard' else True
# collisions += check_vc_ec_neic_iter(h_priority_agents, i, to_count)
# if c_sum > 0:
# print(f'{c_sum=}')
def get_cp_graph(
agents: List[AgentAlg],
other_agents: List[AgentAlg] | None = None,
prev_cp_graph: Dict[str, List[AgentAlg]] | None = None,
) -> Tuple[Dict[str, List[AgentAlg]], Dict[str, List[str]]]:
if other_agents is None:
other_agents = []
# align_all_paths(agents)
cp_graph: Dict[str, List[AgentAlg]] = {}
for a1, a2 in combinations(agents, 2):
if two_equal_paths_have_confs(a1.path, a2.path):
if a1.name not in cp_graph:
cp_graph[a1.name] = []
if a2.name not in cp_graph:
cp_graph[a2.name] = []
cp_graph[a1.name].append(a2)
cp_graph[a2.name].append(a1)
for other_a in other_agents:
if other_a.name in prev_cp_graph:
if other_a.name not in cp_graph:
cp_graph[other_a.name] = []
for nei in prev_cp_graph[other_a.name]:
if nei not in agents:
cp_graph[other_a.name].append(nei)
for a in agents:
if not two_plans_have_no_confs(other_a.path, a.path):
if other_a.name not in cp_graph:
cp_graph[other_a.name] = []
if a.name not in cp_graph:
cp_graph[a.name] = []
cp_graph[other_a.name].append(a)
cp_graph[a.name].append(other_a)
return cp_graph, {}
def get_outer_agent_via_random_walk(
rand_agent: AgentAlg,
agents_s: List[AgentAlg],
occupied_from: Dict[str, AgentAlg]
) -> AgentAlg:
next_node: Node = random.choice(rand_agent.path)
while True:
if next_node.xy_name in occupied_from and occupied_from[next_node.xy_name] not in agents_s:
return occupied_from[next_node.xy_name]
next_node = random.choice(next_node.neighbours_nodes)
def get_agent_s_from_random_walk(
curr_agent: AgentAlg,
cp_graph: Dict[str, List[AgentAlg]],
n_neighbourhood: int,
) -> List[AgentAlg]:
out_list: List[AgentAlg] = []
next_nei: AgentAlg = curr_agent
while len(out_list) < n_neighbourhood:
nei_agents = cp_graph[next_nei.name]
next_nei = random.choice(nei_agents)
if next_nei not in out_list and random.random() < 0.7:
out_list.append(next_nei)
return out_list
def get_agents_subset(
cp_graph: Dict[str, List[AgentAlg]],
cp_graph_names: Dict[str, List[str]],
n_neighbourhood: int,
agents: List[AgentAlg],
occupied_from: Dict[str, AgentAlg],
h_dict: Dict[str, np.ndarray],
) -> List[AgentAlg]:
agents_with_cp: List[AgentAlg] = [a for a in agents if a.name in cp_graph]
curr_agent: AgentAlg = random.choice(agents_with_cp)
# find largest connected component
lcc: List[AgentAlg] = []
l_open = deque([curr_agent])
i = 0
while len(l_open) > 0:
i += 1
next_a = l_open.pop()
heapq.heappush(lcc, next_a)
random.shuffle(cp_graph[next_a.name])
for nei_a in cp_graph[next_a.name]:
if nei_a not in lcc and nei_a not in l_open:
l_open.append(nei_a)
agents_s: List[AgentAlg] = []
if len(lcc) <= n_neighbourhood:
agents_s.extend(lcc)
while len(agents_s) < n_neighbourhood:
rand_agent = random.choice(agents_s)
outer_agent = get_outer_agent_via_random_walk(rand_agent, agents_s, occupied_from)
agents_s.append(outer_agent)
return agents_s
else:
# add until N agents
agents_s = get_agent_s_from_random_walk(curr_agent, cp_graph, n_neighbourhood)
return agents_s
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# K LIMITED Functions
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
def create_k_limit_init_solution(
agents: List[AgentAlg],
nodes: List[Node],
nodes_dict: Dict[str, Node],
h_dict: Dict[str, np.ndarray],
map_dim: Tuple[int, int],
pf_alg_name: str,
pf_alg,
k_limit: int,
start_time: int | float,
vc_empty_np, ec_empty_np, pc_empty_np,
params,
):
max_time: bool = params['max_time']
h_priority_agents: List[AgentAlg] = []
si_table: Dict[str, List[Tuple[int, int, str]]] = init_si_table(nodes)
if pf_alg_name == 'sipps':
vc_hard_np, ec_hard_np, pc_hard_np = vc_empty_np, ec_empty_np, pc_empty_np
vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, k_limit + 1)
elif pf_alg_name == 'a_star':
vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, k_limit + 1)
vc_soft_np, ec_soft_np, pc_soft_np = vc_empty_np, ec_empty_np, pc_empty_np
else:
raise RuntimeError('nono')
for agent in agents:
if time.time() - start_time >= max_time:
return None, {'agents': agents}
new_path, alg_info = pf_alg(
agent.curr_node, agent.goal_node, nodes, nodes_dict, h_dict,
vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np,
flag_k_limit=True, k_limit=k_limit, agent=agent, si_table=si_table
)
if new_path is None:
new_path = [agent.curr_node]
new_path = align_path(new_path, k_limit + 1)
agent.k_path = new_path[:]
h_priority_agents.append(agent)
if pf_alg_name == 'sipps':
update_constraints(new_path, vc_soft_np, ec_soft_np, pc_soft_np)
si_table = update_si_table_soft(new_path, si_table, consider_pc=False)
elif pf_alg_name == 'a_star':
update_constraints(new_path, vc_hard_np, ec_hard_np, pc_hard_np)
else:
raise RuntimeError('nono')
def get_k_limit_cp_graph(
agents: List[AgentAlg],
other_agents: List[AgentAlg] | None = None,
prev_cp_graph: Dict[str, List[AgentAlg]] | None = None,
k_limit: int = int(1e10)
) -> Tuple[Dict[str, List[AgentAlg]], Dict[str, List[str]]]:
if other_agents is None:
other_agents = []
# align_all_paths(agents)
cp_graph: Dict[str, List[AgentAlg]] = {}
for a1, a2 in combinations(agents, 2):
if exceeds_k_dist(a1.curr_node, a2.curr_node, k_limit + 1):
continue
if two_equal_paths_have_confs(a1.k_path, a2.k_path):
if a1.name not in cp_graph:
cp_graph[a1.name] = []
if a2.name not in cp_graph:
cp_graph[a2.name] = []
cp_graph[a1.name].append(a2)
cp_graph[a2.name].append(a1)
for other_a in other_agents:
if other_a.name in prev_cp_graph:
if other_a.name not in cp_graph:
cp_graph[other_a.name] = []
for nei in prev_cp_graph[other_a.name]:
if nei not in agents:
cp_graph[other_a.name].append(nei)
for a in agents:
if exceeds_k_dist(other_a.curr_node, a.curr_node, k_limit + 1):
continue
if two_equal_paths_have_confs(other_a.k_path, a.k_path):
if other_a.name not in cp_graph:
cp_graph[other_a.name] = []
if a.name not in cp_graph:
cp_graph[a.name] = []
cp_graph[other_a.name].append(a)
cp_graph[a.name].append(other_a)
return cp_graph, {}
def get_k_limit_outer_agent_via_random_walk(
rand_agent: AgentAlg,
agents_s: List[AgentAlg],
occupied_from: Dict[str, AgentAlg]
) -> AgentAlg:
next_node: Node = random.choice(rand_agent.k_path)
while True:
if next_node.xy_name in occupied_from and occupied_from[next_node.xy_name] not in agents_s and random.random() < 0.7:
return occupied_from[next_node.xy_name]
next_node = random.choice(next_node.neighbours_nodes)
def get_k_limit_agents_subset(
cp_graph: Dict[str, List[AgentAlg]],
cp_graph_names: Dict[str, List[str]],
n_neighbourhood: int,
agents: List[AgentAlg],
occupied_from: Dict[str, AgentAlg],
h_dict: Dict[str, np.ndarray],
) -> List[AgentAlg]:
agents_with_cp: List[AgentAlg] = [a for a in agents if a.name in cp_graph]
curr_agent: AgentAlg = random.choice(agents_with_cp)
# find largest connected component
lcc: List[AgentAlg] = []
l_open = deque([curr_agent])
i = 0
while len(l_open) > 0:
i += 1
next_a = l_open.pop()
heapq.heappush(lcc, next_a)
random.shuffle(cp_graph[next_a.name])
for nei_a in cp_graph[next_a.name]:
if nei_a not in lcc and nei_a not in l_open:
l_open.append(nei_a)
agents_s: List[AgentAlg] = []
if len(lcc) <= n_neighbourhood:
agents_s.extend(lcc)
while len(agents_s) < n_neighbourhood:
rand_agent = random.choice(agents_s)
outer_agent = get_k_limit_outer_agent_via_random_walk(rand_agent, agents_s, occupied_from)
agents_s.append(outer_agent)
# print(f'\nstrangers, [{agents_s}]')
return agents_s
else:
# add until N agents
agents_s = get_agent_s_from_random_walk(curr_agent, cp_graph, n_neighbourhood)
# print('from neigh')
return agents_s
def solve_k_limit_subset_with_prp(
agents_subset: List[AgentAlg],
outer_agents: List[AgentAlg],
nodes: List[Node],
nodes_dict: Dict[str, Node],
h_dict: Dict[str, np.ndarray],
map_dim: Tuple[int, int],
start_time: int | float,
pf_alg_name: str,
pf_alg,
vc_empty_np, ec_empty_np, pc_empty_np,
k_limit: int = int(1e10),
agents: List[AgentAlg] | None = None,
) -> None:
h_priority_agents: List[AgentAlg] = outer_agents[:]
si_table: Dict[str, List[Tuple[int, int, str]]] = init_si_table(nodes)
if pf_alg_name == 'sipps':
vc_hard_np, ec_hard_np, pc_hard_np = vc_empty_np, ec_empty_np, pc_empty_np
vc_soft_np, pc_soft_np = vc_empty_np, pc_empty_np
ec_soft_np = init_ec_table(map_dim, k_limit + 1)
for h_agent in h_priority_agents:
update_ec_table(h_agent.k_path, ec_soft_np)
si_table = update_si_table_soft(h_agent.k_path, si_table, consider_pc=False)
elif pf_alg_name == 'a_star':
vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, k_limit + 1)
vc_soft_np, ec_soft_np, pc_soft_np = vc_empty_np, ec_empty_np, pc_empty_np
for h_agent in h_priority_agents:
update_constraints(h_agent.k_path, vc_hard_np, ec_hard_np, pc_hard_np)
else:
raise RuntimeError('nono')
random.shuffle(agents_subset)
for agent in agents_subset:
new_path, sipps_info = pf_alg(
agent.curr_node, agent.goal_node, nodes, nodes_dict, h_dict,
vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np,
flag_k_limit=True, k_limit=k_limit, agent=agent, si_table=si_table
)
if new_path is None:
new_path = [agent.curr_node]
new_path = align_path(new_path, k_limit + 1)
agent.k_path = new_path[:]
h_priority_agents.append(agent)
if pf_alg_name == 'sipps':
update_ec_table(new_path, ec_soft_np)
si_table = update_si_table_soft(new_path, si_table, consider_pc=False)
elif pf_alg_name == 'a_star':
update_constraints(new_path, vc_hard_np, ec_hard_np, pc_hard_np)
else:
raise RuntimeError('nono')
# checks
# runtime = time.time() - start_time
# assert len(agents_subset) + len(outer_agents) == len(agents)
# print(f'\r[LNS neigh calc] | agents: {len(h_priority_agents): <3} / {len(agents_subset) + len(outer_agents)} | {runtime=: .2f} s.',end='') # , end=''
# collisions: int = 0
# align_all_paths(h_priority_agents)
# for i in range(len(h_priority_agents[0].path)):
# to_count = False if constr_type == 'hard' else True
# collisions += check_vc_ec_neic_iter(h_priority_agents, i, to_count)
# if c_sum > 0:
# print(f'{c_sum=}')
# def create_hard_and_soft_constraints(h_priority_agents: List[AgentAlg], map_dim: Tuple[int, int], constr_type: str):
# assert constr_type in ['hard', 'soft']
# if len(h_priority_agents) == 0:
# max_path_len = 1
# vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, max_path_len)
# vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, max_path_len)
# # vc_hard_np, ec_hard_np, pc_hard_np = None, None, None
# # vc_soft_np, ec_soft_np, pc_soft_np = None, None, None
# return vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np
# max_path_len = max([len(a.path) for a in h_priority_agents])
# paths = [a.path for a in h_priority_agents]
# if constr_type == 'hard':
# vc_hard_np, ec_hard_np, pc_hard_np = create_constraints(paths, map_dim, max_path_len)
# vc_soft_np, ec_soft_np, pc_soft_np = init_constraints(map_dim, max_path_len)
# # vc_soft_np, ec_soft_np, pc_soft_np = None, None, None
# elif constr_type == 'soft':
# vc_hard_np, ec_hard_np, pc_hard_np = init_constraints(map_dim, max_path_len)
# # vc_hard_np, ec_hard_np, pc_hard_np = None, None, None
# vc_soft_np, ec_soft_np, pc_soft_np = create_constraints(paths, map_dim, max_path_len)
# else:
# raise RuntimeError('nope')
# return vc_hard_np, ec_hard_np, pc_hard_np, vc_soft_np, ec_soft_np, pc_soft_np