import os import random import time import networkx as nx import numpy as np import constants from gen.utils import game_util MAX_WEIGHT_IN_GRAPH = 1e5 PRED_WEIGHT_THRESH = 10 EPSILON = 1e-4 # Direction: 0: north, 1: east, 2: south, 3: west class Graph(object): def __init__(self, use_gt=False, construct_graph=True, scene_id=None, debug=False, env=None): t_start = time.time() # Find self.construct_graph = construct_graph # event = env.step(action='GetReachablePositions') # new_reachable_positions = event.metadata['actionReturn'] # points = [] # for point in new_reachable_positions: # xx = int(point['x'] / constants.AGENT_STEP_SIZE) # yy = int(point['z'] / constants.AGENT_STEP_SIZE) # points.append([xx, yy]) # self.points = np.array(points, dtype=np.int32) # self.points = self.points[np.lexsort(self.points.T)] # Use navigation results already made self.scene_id = scene_id self.points = np.load(os.path.join( os.path.dirname(__file__), os.pardir, 'layouts', 'FloorPlan%s-layout.npy' % self.scene_id)) self.points /= constants.AGENT_STEP_SIZE self.points = np.round(self.points).astype(np.int32) self.xMin = self.points[:, 0].min() - constants.SCENE_PADDING * 2 self.yMin = self.points[:, 1].min() - constants.SCENE_PADDING * 2 self.xMax = self.points[:, 0].max() + constants.SCENE_PADDING * 2 self.yMax = self.points[:, 1].max() + constants.SCENE_PADDING * 2 self.memory = np.zeros((self.yMax - self.yMin + 1, self.xMax - self.xMin + 1), dtype=np.float32) self.gt_graph = None self.shortest_paths = {} self.shortest_paths_unweighted = {} self.use_gt = use_gt self.impossible_spots = set() self.updated_weights = {} self.prev_navigable_locations = None if self.use_gt: self.memory[:] = MAX_WEIGHT_IN_GRAPH self.memory[self.points[:, 1] - self.yMin, self.points[:, 0] - self.xMin] = 1 + EPSILON else: self.memory[:] = 1 self.memory[:, :int(constants.SCENE_PADDING * 1.5)] = MAX_WEIGHT_IN_GRAPH self.memory[:int(constants.SCENE_PADDING * 1.5), :] = MAX_WEIGHT_IN_GRAPH self.memory[:, -int(constants.SCENE_PADDING * 1.5):] = MAX_WEIGHT_IN_GRAPH self.memory[-int(constants.SCENE_PADDING * 1.5):, :] = MAX_WEIGHT_IN_GRAPH if self.gt_graph is None: self.gt_graph = nx.DiGraph() if self.construct_graph: for yy in np.arange(self.yMin, self.yMax + 1): for xx in np.arange(self.xMin, self.xMax + 1): weight = self.memory[yy - self.yMin, xx - self.xMin] for direction in range(4): node = (xx, yy, direction) back_direction = (direction + 2) % 4 back_node = (xx, yy, back_direction) self.gt_graph.add_edge(node, (xx, yy, (direction + 1) % 4), weight=1) self.gt_graph.add_edge(node, (xx, yy, (direction - 1) % 4), weight=1) forward_node = None if direction == 0 and yy != self.yMax: forward_node = (xx, yy + 1, back_direction) elif direction == 1 and xx != self.xMax: forward_node = (xx + 1, yy, back_direction) elif direction == 2 and yy != self.yMin: forward_node = (xx, yy - 1, back_direction) elif direction == 3 and xx != self.xMin: forward_node = (xx - 1, yy, back_direction) if forward_node is not None: self.gt_graph.add_edge(forward_node, back_node, weight=weight) self.initial_memory = self.memory.copy() self.debug = debug if self.debug: print('Graph construction time %.3f' % (time.time() - t_start)) def clear(self): self.shortest_paths = {} self.shortest_paths_unweighted = {} self.impossible_spots = set() self.prev_navigable_locations = None if self.use_gt: self.memory[:] = self.initial_memory else: self.memory[:] = 1 self.memory[:, :int(constants.SCENE_PADDING * 1.5)] = MAX_WEIGHT_IN_GRAPH self.memory[:int(constants.SCENE_PADDING * 1.5), :] = MAX_WEIGHT_IN_GRAPH self.memory[:, -int(constants.SCENE_PADDING * 1.5):] = MAX_WEIGHT_IN_GRAPH self.memory[-int(constants.SCENE_PADDING * 1.5):, :] = MAX_WEIGHT_IN_GRAPH if self.construct_graph: for (nodea, nodeb), original_weight in self.updated_weights.items(): self.gt_graph[nodea][nodeb]['weight'] = original_weight self.updated_weights = {} @property def image(self): return self.memory[:, :].astype(np.uint8) def check_graph_memory_correspondence(self): # graph sanity check if self.construct_graph: for yy in np.arange(self.yMin, self.yMax + 1): for xx in np.arange(self.xMin, self.xMax + 1): for direction in range(4): back_direction = (direction + 2) % 4 back_node = (xx, yy, back_direction) if direction == 0 and yy != self.yMax: assert(abs(self.gt_graph[(xx, yy + 1, back_direction)][back_node]['weight'] - self.memory[int(yy - self.yMin), int(xx - self.xMin)]) < 0.0001) elif direction == 1 and xx != self.xMax: assert(abs(self.gt_graph[(xx + 1, yy, back_direction)][back_node]['weight'] - self.memory[int(yy - self.yMin), int(xx - self.xMin)]) < 0.0001) elif direction == 2 and yy != self.yMin: assert(abs(self.gt_graph[(xx, yy - 1, back_direction)][back_node]['weight'] - self.memory[int(yy - self.yMin), int(xx - self.xMin)]) < 0.0001) elif direction == 3 and xx != self.xMin: assert(abs(self.gt_graph[(xx - 1, yy, back_direction)][back_node]['weight'] - self.memory[int(yy - self.yMin), int(xx - self.xMin)]) < 0.0001) print('\t\t\tgraph tested successfully') def update_graph(self, graph_patch, pose): graph_patch, curr_val = graph_patch curr_val = np.array(curr_val) # Rotate the array to get its global coordinate frame orientation. rotation = int(pose[2]) assert(rotation in {0, 1, 2, 3}), 'rotation was %s' % str(rotation) if rotation != 0: graph_patch = np.rot90(graph_patch, rotation) # Shift offsets to global coordinate frame. if rotation == 0: x_min = pose[0] - int(constants.STEPS_AHEAD / 2) y_min = pose[1] + 1 elif rotation == 1: x_min = pose[0] + 1 y_min = pose[1] - int(constants.STEPS_AHEAD / 2) elif rotation == 2: x_min = pose[0] - int(constants.STEPS_AHEAD / 2) y_min = pose[1] - constants.STEPS_AHEAD elif rotation == 3: x_min = pose[0] - constants.STEPS_AHEAD y_min = pose[1] - int(constants.STEPS_AHEAD / 2) else: raise Exception('Invalid pose direction') if self.construct_graph: for yi, yy in enumerate(range(y_min, y_min + constants.STEPS_AHEAD)): for xi, xx in enumerate(range(x_min, x_min + constants.STEPS_AHEAD)): self.update_weight(xx, yy, graph_patch[yi, xi, 0]) self.update_weight(pose[0], pose[1], curr_val[0]) def get_graph_patch(self, pose): rotation = int(pose[2]) assert(rotation in {0, 1, 2, 3}) if rotation == 0: x_min = pose[0] - int(constants.STEPS_AHEAD / 2) y_min = pose[1] + 1 elif rotation == 1: x_min = pose[0] + 1 y_min = pose[1] - int(constants.STEPS_AHEAD / 2) elif rotation == 2: x_min = pose[0] - int(constants.STEPS_AHEAD / 2) y_min = pose[1] - constants.STEPS_AHEAD elif rotation == 3: x_min = pose[0] - constants.STEPS_AHEAD y_min = pose[1] - int(constants.STEPS_AHEAD / 2) else: raise Exception('Invalid pose direction') x_min -= self.xMin y_min -= self.yMin graph_patch = self.memory[y_min:y_min + constants.STEPS_AHEAD, x_min:x_min + constants.STEPS_AHEAD].copy() if rotation != 0: graph_patch = np.rot90(graph_patch, -rotation) return graph_patch, self.memory[pose[1] - self.yMin, pose[0] - self.xMin].copy() def add_impossible_spot(self, spot): self.update_weight(spot[0], spot[1], MAX_WEIGHT_IN_GRAPH) self.impossible_spots.add(spot) def update_weight(self, xx, yy, weight): if (xx, yy) not in self.impossible_spots: if self.construct_graph: for direction in range(4): node = (xx, yy, direction) self.update_edge(node, weight) self.memory[yy - self.yMin, xx - self.xMin] = weight self.shortest_paths = {} def update_edge(self, pose, weight): rotation = int(pose[2]) assert(rotation in {0, 1, 2, 3}) (xx, yy, direction) = pose back_direction = (direction + 2) % 4 back_pose = (xx, yy, back_direction) if direction == 0 and yy != self.yMax: forward_pose = (xx, yy + 1, back_direction) elif direction == 1 and xx != self.xMax: forward_pose = (xx + 1, yy, back_direction) elif direction == 2 and yy != self.yMin: forward_pose = (xx, yy - 1, back_direction) elif direction == 3 and xx != self.xMin: forward_pose = (xx - 1, yy, back_direction) else: raise NotImplementedError('Unknown direction') if (forward_pose, back_pose) not in self.updated_weights: self.updated_weights[(forward_pose, back_pose)] = self.gt_graph[forward_pose][back_pose]['weight'] self.gt_graph[forward_pose][back_pose]['weight'] = weight def get_shortest_path(self, pose, goal_pose): assert(pose[2] in {0, 1, 2, 3}) assert(goal_pose[2] in {0, 1, 2, 3}) # Store horizons for possible final look correction. curr_horizon = int(pose[3]) goal_horizon = int(goal_pose[3]) pose = tuple(int(pp) for pp in pose[:3]) goal_pose = tuple(int(pp) for pp in goal_pose[:3]) try: assert(self.construct_graph), 'Graph was not constructed, cannot get shortest path.' assert(pose in self.gt_graph), 'start point not in graph' assert(goal_pose in self.gt_graph), 'start point not in graph' except Exception as ex: print('pose', pose, 'goal_pose', goal_pose) raise ex if (pose, goal_pose) not in self.shortest_paths: path = nx.astar_path(self.gt_graph, pose, goal_pose, heuristic=lambda nodea, nodeb: (abs(nodea[0] - nodeb[0]) + abs(nodea[1] - nodeb[1]) + abs(nodea[2] - nodeb[2])), weight='weight') for ii, pp in enumerate(path): self.shortest_paths[(pp, goal_pose)] = path[ii:] path = self.shortest_paths[(pose, goal_pose)] max_point = 1 for ii in range(len(path) - 1): weight = self.gt_graph[path[ii]][path[ii + 1]]['weight'] if path[ii][:2] != path[ii + 1][:2]: if abs(self.memory[path[ii + 1][1] - self.yMin, path[ii + 1][0] - self.xMin] - weight) > 0.001: print(self.memory[path[ii + 1][1] - self.yMin, path[ii + 1][0] - self.xMin], weight) raise AssertionError('weights do not match') if weight >= PRED_WEIGHT_THRESH: break max_point += 1 path = path[:max_point] actions = [Graph.get_plan_move(path[ii], path[ii + 1]) for ii in range(len(path) - 1)] Graph.horizon_adjust(actions, path, curr_horizon, goal_horizon) return actions, path def get_shortest_path_unweighted(self, pose, goal_pose): assert(pose[2] in {0, 1, 2, 3}) assert(goal_pose[2] in {0, 1, 2, 3}) curr_horizon = int(pose[3]) goal_horizon = int(goal_pose[3]) pose = tuple(int(pp) for pp in pose[:3]) goal_pose = tuple(int(pp) for pp in goal_pose[:3]) try: assert(self.construct_graph), 'Graph was not constructed, cannot get shortest path.' assert(pose in self.gt_graph), 'start point not in graph' assert(goal_pose in self.gt_graph), 'start point not in graph' except Exception as ex: print('pose', pose, 'goal_pose', goal_pose) raise ex if (pose, goal_pose) not in self.shortest_paths_unweighted: # TODO: swap this out for astar (might be get_shortest_path tho) and update heuristic to account for # TODO: actual number of turns. path = nx.shortest_path(self.gt_graph, pose, goal_pose) for ii, pp in enumerate(path): self.shortest_paths_unweighted[(pp, goal_pose)] = path[ii:] path = self.shortest_paths_unweighted[(pose, goal_pose)] actions = [Graph.get_plan_move(path[ii], path[ii + 1]) for ii in range(len(path) - 1)] Graph.horizon_adjust(actions, path, curr_horizon, goal_horizon) return actions, path def update_map(self, env): event = env.step({'action': 'GetReachablePositions'}) new_reachable_positions = event.metadata['reachablePositions'] new_memory = np.full_like(self.memory[:, :], MAX_WEIGHT_IN_GRAPH) if self.construct_graph: for point in new_reachable_positions: xx = int(point['x'] / constants.AGENT_STEP_SIZE) yy = int(point['z'] / constants.AGENT_STEP_SIZE) new_memory[yy - self.yMin, xx - self.xMin] = 1 + EPSILON changed_locations = np.where(np.logical_xor(self.memory[:, :] == MAX_WEIGHT_IN_GRAPH, new_memory == MAX_WEIGHT_IN_GRAPH)) for location in zip(*changed_locations): self.update_weight(location[1] + self.xMin, location[0] + self.yMin, 1 + EPSILON) def navigate_to_goal(self, game_state, start_pose, end_pose): # Look down self.update_map(game_state.env) start_angle = start_pose[3] if start_angle > 180: start_angle -= 360 if start_angle != 45: # pitch angle # Perform initial tilt to get to 45 degrees. tilt_pose = [pp for pp in start_pose] tilt_pose[3] = 45 tilt_actions, _ = self.get_shortest_path(start_pose, tilt_pose) for action in tilt_actions: game_state.step(action) start_pose = tuple(tilt_pose) actions, path = self.get_shortest_path(start_pose, end_pose) while len(actions) > 0: for ii, (action, pose) in enumerate(zip(actions, path)): if 'forceAction' not in action: action['forceAction'] = True game_state.step(action) event = game_state.env.last_event last_action_success = event.metadata['lastActionSuccess'] if not last_action_success: # Can't traverse here, make sure the weight is correct. if action['action'].startswith('Look') or action['action'].startswith('Rotate'): raise Exception('Look action failed %s' % event.metadata['errorMessage']) self.add_impossible_spot(path[ii + 1]) break pose = game_util.get_pose(event) actions, path = self.get_shortest_path(pose, end_pose) print('nav done') @staticmethod def get_plan_move(pose0, pose1): if (pose0[2] + 1) % 4 == pose1[2]: action = {'action': 'RotateRight'} elif (pose0[2] - 1) % 4 == pose1[2]: action = {'action': 'RotateLeft'} else: action = {'action': 'MoveAhead', 'moveMagnitude': constants.AGENT_STEP_SIZE} return action @staticmethod def horizon_adjust(actions, path, hor0, hor1): if hor0 < hor1: for _ in range((hor1 - hor0) // constants.AGENT_HORIZON_ADJ): actions.append({'action': 'LookDown'}) path.append(path[-1]) elif hor0 > hor1: for _ in range((hor0 - hor1) // constants.AGENT_HORIZON_ADJ): actions.append({'action': 'LookUp',}) path.append(path[-1])