Spaces:
Paused
Paused
| from multiprocessing import Pool | |
| from tqdm import tqdm | |
| from pathlib import Path | |
| import numpy as np | |
| from collections import deque | |
| import argparse | |
| import cv2 | |
| def get_raycast_building_mask(building_grid): | |
| laser_range = 200 | |
| num_laser = 100 | |
| robot_pos = (building_grid.shape[0] // 2-1, building_grid.shape[1] // 2 - 1) | |
| unoccupied_pos = np.stack(np.where(building_grid != 1), axis=1) | |
| if len(unoccupied_pos) == 0: | |
| return None | |
| l2_dist = unoccupied_pos - [robot_pos[0], robot_pos[1]] | |
| closest = ((l2_dist ** 2).sum(1)**0.5).argmin() | |
| robot_pos = (unoccupied_pos[closest][0], unoccupied_pos[closest][1]) | |
| free_points, hit_points, actual_hit_points = get_free_points_in_front(building_grid, robot_pos, laser_range=laser_range, num_laser=num_laser) | |
| free_points[:, 0][free_points[:, 0] >= building_grid.shape[0]] = building_grid.shape[0] - 1 | |
| free_points[:, 1][free_points[:, 1] >= building_grid.shape[1]] = building_grid.shape[1] - 1 | |
| free_points[:, 0][free_points[:, 0] < 0] = 0 | |
| free_points[:, 1][free_points[:, 1] < 0] = 0 | |
| hit_points[:, 0][hit_points[:, 0] >= building_grid.shape[0]] = building_grid.shape[0] - 1 | |
| hit_points[:, 1][hit_points[:, 1] >= building_grid.shape[1]] = building_grid.shape[1] - 1 | |
| hit_points[:, 0][hit_points[:, 0] < 0] = 0 | |
| hit_points[:, 1][hit_points[:, 1] < 0] = 0 | |
| if len(free_points) > 0: | |
| # Get vis mask by flood filling free space boundary | |
| inited_flood_grid = init_flood_fill(robot_pos, hit_points, building_grid.shape) | |
| inited_flood_grid = (inited_flood_grid * 255).astype(np.uint8).copy() | |
| # pick a seed point from free points, that is not 0 in inited_flood_grid. We want it to be unknown | |
| np.random.shuffle(free_points) | |
| for i in range(len(free_points)): | |
| seed_point = free_points[i] | |
| if inited_flood_grid[seed_point[0], seed_point[1]] != 0: | |
| break # Found a valid seed point, exit the loop | |
| else: | |
| print('Unable to find a valid seed point') | |
| return None | |
| num_filled, flooded_image, mask, bounding_box = cv2.floodFill(inited_flood_grid.copy(), None, seedPoint=(seed_point[1], seed_point[0]), newVal=0) | |
| # name = names[batch_ind][-1] | |
| return flooded_image | |
| else: | |
| print("No free points") | |
| return None | |
| def flood_fill_simple(center_point, occupancy_map): | |
| """ | |
| center_point: starting point (x,y) of fill | |
| occupancy_map: occupancy map generated from Bresenham ray-tracing | |
| """ | |
| # Fill empty areas with queue method | |
| occupancy_map = np.copy(occupancy_map) | |
| sx, sy = occupancy_map.shape | |
| fringe = deque() | |
| fringe.appendleft(center_point) | |
| while fringe: | |
| n = fringe.pop() | |
| nx, ny = n | |
| unknown_val = 0.5 | |
| # West | |
| if nx > 0: | |
| if occupancy_map[nx - 1, ny] == unknown_val: | |
| occupancy_map[nx - 1, ny] = 0 | |
| fringe.appendleft((nx - 1, ny)) | |
| # East | |
| if nx < sx - 1: | |
| if occupancy_map[nx + 1, ny] == unknown_val: | |
| occupancy_map[nx + 1, ny] = 0 | |
| fringe.appendleft((nx + 1, ny)) | |
| # North | |
| if ny > 0: | |
| if occupancy_map[nx, ny - 1] == unknown_val: | |
| occupancy_map[nx, ny - 1] = 0 | |
| fringe.appendleft((nx, ny - 1)) | |
| # South | |
| if ny < sy - 1: | |
| if occupancy_map[nx, ny + 1] == unknown_val: | |
| occupancy_map[nx, ny + 1] = 0 | |
| fringe.appendleft((nx, ny + 1)) | |
| return occupancy_map | |
| def init_flood_fill(robot_pos, obstacle_points, occ_grid_shape): | |
| """ | |
| center_point: center point | |
| obstacle_points: detected obstacles points (x,y) | |
| xy_points: (x,y) point pairs | |
| """ | |
| center_x, center_y = robot_pos | |
| prev_ix, prev_iy = center_x, center_y | |
| occupancy_map = (np.ones(occ_grid_shape)) * 0.5 | |
| # append first obstacle point to last | |
| obstacle_points = np.vstack((obstacle_points, obstacle_points[0])) | |
| for (x, y) in zip(obstacle_points[:,0], obstacle_points[:,1]): | |
| # x coordinate of the the occupied area | |
| ix = int(x) | |
| # y coordinate of the the occupied area | |
| iy = int(y) | |
| free_area = bresenham((prev_ix, prev_iy), (ix, iy)) | |
| for fa in free_area: | |
| occupancy_map[fa[0]][fa[1]] = 0 # free area 0.0 | |
| prev_ix = ix | |
| prev_iy = iy | |
| return occupancy_map | |
| show_animation = False | |
| def bresenham(start, end): | |
| """ | |
| Implementation of Bresenham's line drawing algorithm | |
| See en.wikipedia.org/wiki/Bresenham's_line_algorithm | |
| Bresenham's Line Algorithm | |
| Produces a np.array from start and end (original from roguebasin.com) | |
| >>> points1 = bresenham((4, 4), (6, 10)) | |
| >>> print(points1) | |
| np.array([[4,4], [4,5], [5,6], [5,7], [5,8], [6,9], [6,10]]) | |
| """ | |
| # setup initial conditions | |
| x1, y1 = start | |
| x2, y2 = end | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| is_steep = abs(dy) > abs(dx) # determine how steep the line is | |
| if is_steep: # rotate line | |
| x1, y1 = y1, x1 | |
| x2, y2 = y2, x2 | |
| # swap start and end points if necessary and store swap state | |
| swapped = False | |
| if x1 > x2: | |
| x1, x2 = x2, x1 | |
| y1, y2 = y2, y1 | |
| swapped = True | |
| dx = x2 - x1 # recalculate differentials | |
| dy = y2 - y1 # recalculate differentials | |
| error = int(dx / 2.0) # calculate error | |
| y_step = 1 if y1 < y2 else -1 | |
| # iterate over bounding box generating points between start and end | |
| y = y1 | |
| points = [] | |
| for x in range(x1, x2 + 1): | |
| coord = [y, x] if is_steep else (x, y) | |
| points.append(coord) | |
| error -= abs(dy) | |
| if error < 0: | |
| y += y_step | |
| error += dx | |
| if swapped: # reverse the list if the coordinates were swapped | |
| points.reverse() | |
| points = np.array(points) | |
| return points | |
| def get_free_points_in_front(occupancy_grid, robot_pos, laser_range=10, num_laser=100): | |
| """ | |
| Assumes circular lidar | |
| occupancy_grid: np.array (h x w) | |
| robot_pos: (x, y) | |
| Outputs: | |
| free_points: np.array of hit points (x, y) | |
| """ | |
| free_points = [] | |
| hit_points = [] # actual hit points + last bresenham point (for some reason need this for flodding) | |
| actual_hit_points = [] # | |
| for orientation in np.linspace(np.pi/2, 3*np.pi/2, num_laser): | |
| end_point = (round(robot_pos[0] + laser_range * np.cos(orientation)), round(robot_pos[1] + laser_range * np.sin(orientation))) | |
| # Get index along ray to check | |
| bresenham_points = (bresenham(robot_pos, end_point)) | |
| # Go through the points and see the first hit | |
| # TODO: do a check if any first? | |
| for i in range(len(bresenham_points)): | |
| # if bresenham point is in the map | |
| if bresenham_points[i,0] < 0 or bresenham_points[i,0] >= occupancy_grid.shape[0] or bresenham_points[i,1] < 0 or bresenham_points[i,1] >= occupancy_grid.shape[1]: | |
| if i != 0: | |
| hit_points.append(bresenham_points[i-1]) | |
| break # don't use this bresenham point | |
| if occupancy_grid[bresenham_points[i,0], bresenham_points[i,1]] == 1: # hit if it is void or occupied #! THINK IF THIS IS A GOOD ASSUMPTION | |
| for j in range(min(4, len(bresenham_points) - i - 1)): # add 4 points in front of hit | |
| free_points.append(bresenham_points[i+j]) | |
| actual_hit_points.append(bresenham_points[i + j + 1]) | |
| hit_points.append(bresenham_points[i + j + 1]) | |
| break | |
| else: # no hits | |
| free_point = bresenham_points[i] | |
| free_points.append(free_point) | |
| if i == len(bresenham_points) - 1: | |
| hit_points.append(end_point) # need to add this for proper flooding for vis mask | |
| break | |
| # Convert to np.array | |
| free_points = np.array(free_points) | |
| hit_points = np.array(hit_points) | |
| actual_hit_points = np.array(actual_hit_points) | |
| return free_points, hit_points, actual_hit_points | |
| if __name__ == "__main__": | |
| # Argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--dataset_folder", type=str, default="/path/to/raycast") | |
| parser.add_argument("--class_idx_building", type=int, default=4) | |
| parser.add_argument("--num_workers", type=int, default=60) | |
| parser.add_argument("--location", type=str, default="los_angeles") | |
| args = parser.parse_args() | |
| dataset_folder = Path(args.dataset_folder) | |
| bev_folder = dataset_folder / args.location / "semantic_masks" | |
| output_folder = dataset_folder / args.location / "flood_fill" | |
| output_folder.mkdir(exist_ok=True, parents=True) | |
| def generate_mask(filepath): | |
| mask = np.load(filepath) | |
| building_grid = mask[..., args.class_idx_building] | |
| try: | |
| flooded_image = get_raycast_building_mask(building_grid) | |
| except: | |
| raise Exception(f"Error in {filepath}") | |
| if flooded_image is not None: | |
| output_file = output_folder / filepath.name | |
| np.save(output_file, flooded_image) | |
| else: | |
| print("No flood fill generated") | |
| bev_files = list(bev_folder.iterdir()) | |
| with Pool(args.num_workers) as p: | |
| for _ in tqdm(p.imap_unordered(generate_mask, bev_files), total=len(bev_files)): | |
| pass | |