MAPF_Solver / functions.py
ArseniyPerchik's picture
more
ad7641a
from globals import *
def load_locations(loc_string: str):
try:
loc_dict = json.loads(loc_string)
return loc_dict
except json.JSONDecodeError:
return 'None'
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# HELP FUNCS
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
def save_results(logs_dict: dict):
alg_names = logs_dict['alg_names']
i_problems = logs_dict['i_problems']
img_dir = logs_dict['img_dir']
expr_type = logs_dict['expr_type']
file_dir = f'logs_for_experiments/{expr_type}_{datetime.now().strftime("%Y-%m-%d--%H-%M")}_ALGS-{len(alg_names)}_RUNS-{i_problems}_MAP-{img_dir[:-4]}.json'
# Serializing json
json_object = json.dumps(logs_dict, indent=4)
with open(file_dir, "w") as outfile:
outfile.write(json_object)
print(f'Results saved in: {file_dir}')
return file_dir
def set_seed(random_seed_bool, seed=1):
if random_seed_bool:
seed = random.randint(0, 10000)
random.seed(seed)
np.random.seed(seed)
print(f'[SEED]: --- {seed} ---')
def get_dims_from_pic(file_dir):
with open(f'{file_dir}') as f:
lines = f.readlines()
height = int(re.search(r'\d+', lines[1]).group())
width = int(re.search(r'\d+', lines[2]).group())
return height, width
def get_np_from_dot_map(file_dir):
with open(f'{file_dir}') as f:
lines = f.readlines()
height, width = get_dims_from_pic(file_dir)
img_np = np.zeros((height, width))
for height_index, line in enumerate(lines[4:]):
for width_index, curr_str in enumerate(line):
if curr_str == '.':
img_np[height_index, width_index] = 1
return img_np, (height, width)
def build_graph_from_np(img_np: np.ndarray, show_map: bool = False) -> Tuple[List[Node], Dict[str, Node]]:
# 0 - wall, 1 - free space
nodes = []
nodes_dict = {}
x_size, y_size = img_np.shape
# CREATE NODES
for i_x in range(x_size):
for i_y in range(y_size):
if img_np[i_x, i_y] == 1:
node = Node(i_x, i_y)
nodes.append(node)
nodes_dict[node.xy_name] = node
# CREATE NEIGHBOURS
for node1, node2 in combinations(nodes, 2):
if abs(node1.x - node2.x) > 1 or abs(node1.y - node2.y) > 1:
continue
if abs(node1.x - node2.x) == 1 and abs(node1.y - node2.y) == 1:
continue
node1.neighbours.append(node2.xy_name)
node2.neighbours.append(node1.xy_name)
# dist = distance_nodes(node1, node2)
# if dist == 1:
for node in nodes:
node.neighbours.append(node.xy_name)
heapq.heapify(node.neighbours)
for node in nodes:
for nei in node.neighbours:
node.neighbours_nodes.append(nodes_dict[nei])
if show_map:
plt.imshow(img_np, cmap='gray', origin='lower')
plt.show()
# plt.pause(1)
# plt.close()
return nodes, nodes_dict
def exctract_h_dict(img_dir) -> Dict[str, np.ndarray]:
# print(f'Started to build heuristic for {kwargs['img_dir'][:-4]}...')
# possible_dir = f"{path}/h_dict_of_{img_dir[:-4]}.json"
doc = perfect_heuristics_col.find_one({"name": img_dir})
h_dict = doc['h_dict']
for k, v in h_dict.items():
h_dict[k] = np.array(v)
return h_dict
# if there is one
# if os.path.exists(possible_dir):
# # Opening JSON file
# with open(possible_dir, 'r') as openfile:
# # Reading from json file
# h_dict = json.load(openfile)
# for k, v in h_dict.items():
# h_dict[k] = np.array(v)
# return h_dict
#
# raise RuntimeError('nu nu')
def get_blocked_sv_map(img_dir: str, folder_dir: str = 'logs_for_freedom_maps'):
# possible_dir = f'{folder_dir}/blocked_{img_dir[:-4]}.npy'
possible_dir = f'{folder_dir}/blocked_v2_{img_dir[:-4]}.npy'
assert os.path.exists(possible_dir)
with open(possible_dir, 'rb') as f:
blocked_sv_map = np.load(f)
return blocked_sv_map
def get_sv_map(img_dir: str, folder_dir: str = 'logs_for_freedom_maps'):
possible_dir = f'{folder_dir}/{img_dir[:-4]}.npy'
assert os.path.exists(possible_dir)
with open(possible_dir, 'rb') as f:
sv_map = np.load(f)
return sv_map
def create_constraints(
paths: List[List[Node]], map_dim: Tuple[int, int], max_path_len: int
) -> Tuple[np.ndarray | None, np.ndarray | None, np.ndarray | None]:
"""
vc_np: vertex constraints [x, y, t] = bool
ec_np: edge constraints [x, y, x, y, t] = bool
pc_np: permanent constraints [x, y] = int or -1
"""
if len(paths) == 0:
return None, None, None
if max_path_len == 0:
return None, None, None
vc_np = np.zeros((map_dim[0], map_dim[1], max_path_len))
ec_np = np.zeros((map_dim[0], map_dim[1], map_dim[0], map_dim[1], max_path_len))
pc_np = np.ones((map_dim[0], map_dim[1])) * -1
for path in paths:
update_constraints(path, vc_np, ec_np, pc_np)
return vc_np, ec_np, pc_np
def init_constraints(
map_dim: Tuple[int, int], max_path_len: int
) -> Tuple[np.ndarray | None, np.ndarray | None, np.ndarray | None]:
"""
vc_np: vertex constraints [x, y, t] = bool
ec_np: edge constraints [x, y, x, y, t] = bool
pc_np: permanent constraints [x, y] = int or -1
"""
if max_path_len == 0:
return None, None, None
vc_np = np.zeros((map_dim[0], map_dim[1], max_path_len))
ec_np = np.zeros((map_dim[0], map_dim[1], map_dim[0], map_dim[1], max_path_len))
pc_np = np.ones((map_dim[0], map_dim[1])) * -1
return vc_np, ec_np, pc_np
def init_ec_table(
map_dim: Tuple[int, int], max_path_len: int
) -> np.ndarray | None:
"""
ec_np: edge constraints [x, y, x, y, t] = bool
"""
if max_path_len == 0:
return None, None, None
ec_np = np.zeros((map_dim[0], map_dim[1], map_dim[0], map_dim[1], max_path_len))
return ec_np
def update_ec_table(
path: List[Node], ec_np: np.ndarray
) -> np.ndarray | None:
"""
ec_np: edge constraints [x, y, x, y, t] = bool
"""
prev_n = path[0]
for t, n in enumerate(path):
# ec
ec_np[prev_n.x, prev_n.y, n.x, n.y, t] = 1
prev_n = n
return ec_np
def update_constraints(
path: List[Node], vc_np: np.ndarray, ec_np: np.ndarray, pc_np: np.ndarray
) -> Tuple[np.ndarray | None, np.ndarray | None, np.ndarray | None]:
"""
vc_np: vertex constraints [x, y, t] = bool
ec_np: edge constraints [x, y, x, y, t] = bool
pc_np: permanent constraints [x, y] = int or -1
"""
# pc
last_node = path[-1]
last_time = len(path) - 1
pc_np[last_node.x, last_node.y] = max(int(pc_np[last_node.x, last_node.y]), last_time)
prev_n = path[0]
for t, n in enumerate(path):
# vc
vc_np[n.x, n.y, t] = 1
# ec
ec_np[prev_n.x, prev_n.y, n.x, n.y, t] = 1
prev_n = n
return vc_np, ec_np, pc_np
def align_all_paths(agents: List, flag_k_limit: bool = False) -> int:
if len(agents) == 0:
return 1
if flag_k_limit:
max_len = max([len(a.k_path) for a in agents])
for a in agents:
while len(a.k_path) < max_len:
a.k_path.append(a.k_path[-1])
return max_len
max_len = max([len(a.path) for a in agents])
for a in agents:
while len(a.path) < max_len:
a.path.append(a.path[-1])
return max_len
def shorten_back_all_paths(agents: List) -> None:
for a in agents:
if len(a.path) <= 2:
continue
minus_1_node = a.path[-1]
minus_2_node = a.path[-2]
while minus_1_node == minus_2_node:
a.path = a.path[:-1]
if len(a.path) <= 2:
break
minus_1_node = a.path[-1]
minus_2_node = a.path[-2]
def check_one_vc_ec_neic_iter(path: List[Node], agent_name, other_paths: Dict[str, List[Node]], iteration: int) -> None:
collisions: int = 0
for agent_other_name, other_path in other_paths.items():
# vertex conf
assert path[iteration] != other_path[iteration], f'[i: {iteration}] vertex conf: {path[iteration].xy_name}'
# edge conf
prev_node1 = path[max(0, iteration - 1)]
curr_node1 = path[iteration]
prev_node2 = other_path[max(0, iteration - 1)]
curr_node2 = other_path[iteration]
edge1 = (prev_node1.x, prev_node1.y, curr_node1.x, curr_node1.y)
edge2 = (curr_node2.x, curr_node2.y, prev_node2.x, prev_node2.y)
# nei conf
assert path[iteration].xy_name in path[max(0, iteration - 1)].neighbours, f'[i: {iteration}] wow wow wow! Not nei pos!'
assert edge1 != edge2, f'[i: {iteration}] edge collision: {edge1}'
assert path[iteration].xy_name in path[max(0, iteration - 1)].neighbours, f'[i: {iteration}] wow wow wow! Not nei pos!'
def check_vc_ec_neic_iter(agents: list | Deque, iteration: int, to_count: bool = False) -> int:
collisions: int = 0
for a1, a2 in combinations(agents, 2):
# vertex conf
if not to_count:
assert a1.path[iteration] != a2.path[iteration], f'[i: {iteration}] vertex conf: {a1.name}-{a2.name} in {a1.path[iteration].xy_name}'
else:
if a1.path[iteration] == a2.path[iteration]:
collisions += 1
# edge conf
prev_node1 = a1.path[max(0, iteration - 1)]
curr_node1 = a1.path[iteration]
prev_node2 = a2.path[max(0, iteration - 1)]
curr_node2 = a2.path[iteration]
edge1 = (prev_node1.x, prev_node1.y, curr_node1.x, curr_node1.y)
edge2 = (curr_node2.x, curr_node2.y, prev_node2.x, prev_node2.y)
# nei conf
assert a1.path[iteration].xy_name in a1.path[max(0, iteration - 1)].neighbours, f'[i: {iteration}] wow wow wow! Not nei pos!'
if not to_count:
assert edge1 != edge2, f'[i: {iteration}] edge collision: {a1.name}-{a2.name} in {edge1}'
else:
if edge1 == edge2:
collisions += 1
assert agents[-1].path[iteration].xy_name in agents[-1].path[max(0, iteration - 1)].neighbours, f'[i: {iteration}] wow wow wow! Not nei pos!'
return collisions
def check_configs(
agents: List[AgentAlg],
config_from: Dict[str, Node],
config_to: Dict[str, Node],
final_check: bool = True
) -> None:
if final_check:
for agent in agents:
assert agent.name in config_from
assert agent.name in config_to
for a1, a2 in combinations(agents, 2):
if a1.name not in config_to or a2.name not in config_to:
continue
# vertex conf
from_node_1: Node = config_from[a1.name]
to_node_1: Node = config_to[a1.name]
from_node_2: Node = config_from[a2.name]
to_node_2: Node = config_to[a2.name]
# vc
assert from_node_1 != from_node_2, f' vc: {a1.name}-{a2.name} in {from_node_1.xy_name}'
assert to_node_1 != to_node_2, f' vc: {a1.name}-{a2.name} in {to_node_2.xy_name}'
# edge conf
edge1 = (from_node_1.x, from_node_1.y, to_node_1.x, to_node_1.y)
edge2 = (to_node_2.x, to_node_2.y, from_node_2.x, from_node_2.y)
assert edge1 != edge2, f'ec: {a1.name}-{a2.name} in {edge1}'
# nei conf
assert from_node_1.xy_name in to_node_1.neighbours, f'neic {a1.name}: {from_node_1.xy_name} not nei of {to_node_1.xy_name}'
assert from_node_2.xy_name in to_node_2.neighbours, f'neic {a2.name}: {from_node_2.xy_name} not nei of {to_node_2.xy_name}'
def ranges_intersect(range1, range2):
start1, end1 = range1
start2, end2 = range2
return start1 <= end2 and start2 <= end1
def use_profiler(save_dir):
def decorator(func):
def inner1(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
# getting the returned value
returned_value = func(*args, **kwargs)
profiler.disable()
stats = pstats.Stats(profiler).sort_stats('cumtime')
stats.dump_stats(save_dir)
# returning the value to the original frame
return returned_value
return inner1
return decorator
def two_plans_have_no_confs(path1: List[Node], path2: List[Node]):
min_len = min(len(path1), len(path2))
# assert len(path1) == len(path2)
prev1 = None
prev2 = None
bigger_path = path1 if len(path1) >= len(path2) else path2
smaller_path = path1 if len(path1) < len(path2) else path2
if smaller_path[-1] in bigger_path[len(smaller_path) - 1:]:
return False
for i, (vertex1, vertex2) in enumerate(zip(path1[:min_len], path2[:min_len])):
if vertex1.x == vertex2.x and vertex1.y == vertex2.y:
return False
if i > 0:
# edge1 = (prev1.xy_name, vertex1.xy_name)
# edge2 = (vertex2.xy_name, prev2.xy_name)
# if (prev1.x, prev1.y, vertex1.x, vertex1.y) == (vertex2.x, vertex2.y, prev2.x, prev2.y):
if prev1.x == vertex2.x and prev1.y == vertex2.y and vertex1.x == prev2.x and vertex1.y == prev2.y:
return False
prev1 = vertex1
prev2 = vertex2
return True
def two_equal_paths_have_confs(path1: List[Node], path2: List[Node]):
assert len(path1) == len(path2)
from1 = None
from2 = None
for i, (to1, to2) in enumerate(zip(path1, path2)):
if to1.x == to2.x and to1.y == to2.y:
return True
if i > 0:
if from1.x == to2.x and from1.y == to2.y and to1.x == from2.x and to1.y == from2.y:
return True
from1 = to1
from2 = to2
return False
def create_agents(
start_nodes: List[Node], goal_nodes: List[Node]
) -> Tuple[List[AgentAlg], Dict[str, AgentAlg]]:
agents: List[AgentAlg] = []
agents_dict: Dict[str, AgentAlg] = {}
for num, (s_node, g_node) in enumerate(zip(start_nodes, goal_nodes)):
new_agent = AgentAlg(num, s_node, g_node)
agents.append(new_agent)
agents_dict[new_agent.name] = new_agent
return agents, agents_dict
def time_is_good(start_time: int | float, max_time: int) -> bool:
runtime = time.time() - start_time
return runtime < max_time
def align_path(new_path: List[Node], k_limit: int) -> List[Node]:
while len(new_path) < k_limit:
new_path.append(new_path[-1])
return new_path[:k_limit]
def manhattan_dist(n1: Node, n2: Node):
"""
The Manhattan Distance between two points (X1, Y1) and (X2, Y2) is given by |X1 – X2| + |Y1 – Y2|.
:return:
"""
return np.abs(n1.x - n2.x) + np.abs(n1.y - n2.y)
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# LIFELONG / k-LIMITED FUNCS
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
def add_k_paths_to_agents(agents: List[AgentAlg] | list) -> None:
for agent in agents:
if len(agent.path) == 0:
agent.path.append(agent.start_node)
agent.path.extend(agent.k_path[1:])
def update_goal_nodes(agents: List[AgentAlg] | list, nodes: List[Node]) -> int:
finished_goals = 0
goal_names: List[str] = [a.goal_node.xy_name for a in agents if a.goal_node is not None]
heapq.heapify(goal_names)
free_nodes: List[Node] = [n for n in nodes if n.xy_name not in goal_names]
for agent in agents:
to_change_goal: bool = False
if agent.goal_node is not None and agent.curr_node == agent.goal_node:
finished_goals += 1
to_change_goal = True
if agent.goal_node is None:
to_change_goal = True
if to_change_goal:
# if random.random() < 0.5:
# agent.goal_node = None
# continue
next_goal_node = random.choice(free_nodes)
while next_goal_node.xy_name in goal_names:
next_goal_node = random.choice(free_nodes)
agent.goal_node = next_goal_node
heapq.heappush(goal_names, next_goal_node.xy_name)
return finished_goals
def stay_k_path_agent(agent: AgentAlg | Any, n: Node, k_limit: int) -> None:
agent.k_path = [n]
while len(agent.k_path) < k_limit:
agent.k_path.append(n)
agent.k_path = agent.k_path[:k_limit]
def exceeds_k_dist(n1: Node, n2: Node, k_limit: int) -> bool:
dist_x = abs(n1.x - n2.x)
if dist_x > k_limit:
return True
dist_y = abs(n1.y - n2.y)
if dist_y > k_limit:
return True
if dist_x + dist_y > k_limit:
return True
return False
def repair_agents_k_paths(agents: List[AgentAlg] | list, k_limit: int) -> None:
standby_agents_dict: Dict[str, bool] = {}
for agent in agents:
if agent.k_path is None or len(agent.k_path) == 0:
stay_k_path_agent(agent, agent.curr_node, k_limit + 1)
standby_agents_dict[agent.name] = True
else:
standby_agents_dict[agent.name] = False
all_good = False
while not all_good:
all_good = True
for a1, a2 in combinations(agents, 2):
if standby_agents_dict[a1.name] and standby_agents_dict[a2.name]:
continue
if exceeds_k_dist(a1.curr_node, a2.curr_node, k_limit + 1):
continue
if two_equal_paths_have_confs(a1.k_path, a2.k_path):
stay_k_path_agent(a1, a1.curr_node, k_limit + 1)
stay_k_path_agent(a2, a2.curr_node, k_limit + 1)
standby_agents_dict[a1.name] = True
standby_agents_dict[a2.name] = True
all_good = False
break
print(' | repaired')
return
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# METRICS FUNCS
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
# -------------------------------------------------------------------------------------------------------------------- #
def shorten_back_path(input_path: List[Node]) -> List[Node]:
path = input_path[:]
if len(path) <= 2:
return path
minus_1_node = path[-1]
minus_2_node = path[-2]
while minus_1_node == minus_2_node:
path = path[:-1]
if len(path) <= 2:
break
minus_1_node = path[-1]
minus_2_node = path[-2]
return path
def get_makespan_metric(paths_dict: Dict[str, List[Node]]) -> int:
old_paths = list(paths_dict.values())
paths: List[List[Node]] = []
for path in old_paths:
paths.append(shorten_back_path(path))
makespan: int = max([len(path) for path in paths])
return makespan
def get_soc_metric(paths_dict: Dict[str, List[Node]]) -> int:
old_paths = list(paths_dict.values())
paths: List[List[Node]] = []
for path in old_paths:
paths.append(shorten_back_path(path))
soc: int = sum([len(path) for path in paths])
return soc