Spaces:
Paused
Paused
| # Adapted from prepare.py | |
| import asyncio | |
| import argparse | |
| from collections import defaultdict | |
| import json | |
| import shutil | |
| from pathlib import Path | |
| from typing import List, Dict | |
| import numpy as np | |
| import cv2 | |
| from tqdm import tqdm | |
| from tqdm.contrib.concurrent import thread_map | |
| from omegaconf import DictConfig, OmegaConf | |
| from opensfm.pygeometry import Camera | |
| from opensfm.pymap import Shot | |
| from opensfm.undistort import ( | |
| perspective_camera_from_fisheye, | |
| perspective_camera_from_perspective, | |
| ) | |
| from .. import logger | |
| # from ...osm.tiling import TileManager | |
| # from ...osm.viz import GeoPlotter | |
| from .geo import BoundaryBox, Projection | |
| from .utils import decompose_rotmat | |
| from .utils_sfm import ( | |
| keyframe_selection, | |
| perspective_camera_from_pano, | |
| scale_camera, | |
| CameraUndistorter, | |
| PanoramaUndistorter, | |
| undistort_shot, | |
| ) | |
| from .download import ( | |
| opensfm_shot_from_info, | |
| image_filename, | |
| ) | |
| default_cfg = OmegaConf.create( | |
| { | |
| "max_image_size": 512, | |
| "do_legacy_pano_offset": True, | |
| "min_dist_between_keyframes": 4, | |
| "tiling": { | |
| "tile_size": 128, | |
| "margin": 128, | |
| "ppm": 2, | |
| }, | |
| } | |
| ) | |
| def get_pano_offset(image_info: dict, do_legacy: bool = False) -> float: | |
| if do_legacy: | |
| seed = int(image_info["sfm_cluster"]["id"]) | |
| else: | |
| seed = image_info["sequence"].__hash__() | |
| seed = seed % (2**32 - 1) | |
| return np.random.RandomState(seed).uniform(-45, 45) | |
| def process_shot( | |
| shot: Shot, info: dict, image_path: Path, output_dir: Path, cfg: DictConfig | |
| ) -> List[Shot]: | |
| if not image_path.exists(): | |
| logger.warn(f"Image {image_path} does not exist !") | |
| return None | |
| image_orig = cv2.imread(str(image_path)) | |
| max_size = cfg.max_image_size | |
| pano_offset = None | |
| camera = shot.camera | |
| camera.width, camera.height = image_orig.shape[:2][::-1] | |
| if camera.is_panorama(camera.projection_type): | |
| camera_new = perspective_camera_from_pano(camera, max_size) | |
| undistorter = PanoramaUndistorter(camera, camera_new) | |
| pano_offset = get_pano_offset(info, cfg.do_legacy_pano_offset) | |
| elif camera.projection_type in ["fisheye", "perspective"]: | |
| if camera.projection_type == "fisheye": | |
| camera_new = perspective_camera_from_fisheye(camera) | |
| else: | |
| camera_new = perspective_camera_from_perspective(camera) | |
| camera_new = scale_camera(camera_new, max_size) | |
| camera_new.id = camera.id + "_undistorted" | |
| undistorter = CameraUndistorter(camera, camera_new) | |
| else: | |
| raise NotImplementedError(camera.projection_type) | |
| shots_undist, images_undist = undistort_shot( | |
| image_orig, shot, undistorter, pano_offset | |
| ) | |
| for shot, image in zip(shots_undist, images_undist): | |
| cv2.imwrite(str(output_dir / f"{shot.id}.jpg"), image) | |
| return shots_undist | |
| def pack_shot_dict(shot: Shot, info: dict) -> dict: | |
| latlong = info["computed_geometry.coordinates"][::-1] | |
| latlong_gps = info["geometry.coordinates"][::-1] | |
| w_p_c = shot.pose.get_origin() | |
| w_r_c = shot.pose.get_R_cam_to_world() | |
| rpy = decompose_rotmat(w_r_c) | |
| return dict( | |
| camera_id=shot.camera.id, | |
| latlong=latlong, | |
| t_c2w=w_p_c, | |
| R_c2w=w_r_c, | |
| roll_pitch_yaw=rpy, | |
| capture_time=info["captured_at"], | |
| gps_position=np.r_[latlong_gps, info["altitude"]], | |
| compass_angle=info["compass_angle"], | |
| chunk_id=int(info["sfm_cluster.id"]), | |
| ) | |
| def pack_camera_dict(camera: Camera) -> dict: | |
| assert camera.projection_type == "perspective" | |
| K = camera.get_K_in_pixel_coordinates(camera.width, camera.height) | |
| return dict( | |
| id=camera.id, | |
| model="PINHOLE", | |
| width=camera.width, | |
| height=camera.height, | |
| params=K[[0, 1, 0, 1], [0, 1, 2, 2]], | |
| ) | |
| def process_sequence( | |
| image_ids: List[int], | |
| image_infos: dict, | |
| projection: Projection, | |
| cfg: DictConfig, | |
| raw_image_dir: Path, | |
| out_image_dir: Path, | |
| ): | |
| shots = [] | |
| dump = {} | |
| processed_ids = list() | |
| if len(image_ids) == 0: | |
| return dump, processed_ids | |
| image_ids = sorted(image_ids, key=lambda i: image_infos[i]["captured_at"]) | |
| for i in image_ids: | |
| _, shot = opensfm_shot_from_info(image_infos[i], projection) | |
| shots.append(shot) | |
| shot_idxs = keyframe_selection(shots, min_dist=cfg.min_dist_between_keyframes) | |
| shots = [shots[i] for i in shot_idxs] | |
| shots_out = thread_map( | |
| lambda shot: process_shot( | |
| shot, | |
| image_infos[shot.id], | |
| raw_image_dir / image_filename.format(image_id=shot.id), | |
| out_image_dir, | |
| cfg, | |
| ), | |
| shots, | |
| disable=True, | |
| ) | |
| shots_out = [s for s in shots_out if s is not None] | |
| shots_out = [(i, s) for i, ss in enumerate(shots_out) for s in ss if ss is not None] | |
| for index, shot in shots_out: | |
| i, suffix = shot.id.rsplit("_", 1) | |
| processed_ids.append(i) | |
| info = image_infos[i] | |
| seq_id = info["sequence"] | |
| is_pano = not suffix.endswith("undistorted") | |
| if is_pano: | |
| seq_id += f"_{suffix}" | |
| if seq_id not in dump: | |
| dump[seq_id] = dict(views={}, cameras={}) | |
| view = pack_shot_dict(shot, info) | |
| view["index"] = index | |
| dump[seq_id]["views"][shot.id] = view | |
| dump[seq_id]["cameras"][shot.camera.id] = pack_camera_dict(shot.camera) | |
| return dump, processed_ids |