| | |
| |
|
| | import asyncio |
| | import argparse |
| | from collections import defaultdict |
| | import json |
| | import shutil |
| | from pathlib import Path |
| | from typing import List, Dict |
| |
|
| | import numpy as np |
| | import cv2 |
| | from tqdm import tqdm |
| | from tqdm.contrib.concurrent import thread_map |
| | from omegaconf import DictConfig, OmegaConf |
| | from opensfm.pygeometry import Camera |
| | from opensfm.pymap import Shot |
| | from opensfm.undistort import ( |
| | perspective_camera_from_fisheye, |
| | perspective_camera_from_perspective, |
| | ) |
| |
|
| | from .. import logger |
| | |
| | |
| | from .geo import BoundaryBox, Projection |
| | from .utils import decompose_rotmat |
| | from .utils_sfm import ( |
| | keyframe_selection, |
| | perspective_camera_from_pano, |
| | scale_camera, |
| | CameraUndistorter, |
| | PanoramaUndistorter, |
| | undistort_shot, |
| | ) |
| | from .download import ( |
| | opensfm_shot_from_info, |
| | image_filename, |
| | ) |
| |
|
| |
|
| | default_cfg = OmegaConf.create( |
| | { |
| | "max_image_size": 512, |
| | "do_legacy_pano_offset": True, |
| | "min_dist_between_keyframes": 4, |
| | "tiling": { |
| | "tile_size": 128, |
| | "margin": 128, |
| | "ppm": 2, |
| | }, |
| | } |
| | ) |
| |
|
| |
|
| | def get_pano_offset(image_info: dict, do_legacy: bool = False) -> float: |
| | if do_legacy: |
| | seed = int(image_info["sfm_cluster"]["id"]) |
| | else: |
| | seed = image_info["sequence"].__hash__() |
| | seed = seed % (2**32 - 1) |
| | return np.random.RandomState(seed).uniform(-45, 45) |
| |
|
| |
|
| | def process_shot( |
| | shot: Shot, info: dict, image_path: Path, output_dir: Path, cfg: DictConfig |
| | ) -> List[Shot]: |
| | if not image_path.exists(): |
| | logger.warn(f"Image {image_path} does not exist !") |
| | return None |
| |
|
| | image_orig = cv2.imread(str(image_path)) |
| | max_size = cfg.max_image_size |
| | pano_offset = None |
| |
|
| | camera = shot.camera |
| | camera.width, camera.height = image_orig.shape[:2][::-1] |
| | if camera.is_panorama(camera.projection_type): |
| | camera_new = perspective_camera_from_pano(camera, max_size) |
| | undistorter = PanoramaUndistorter(camera, camera_new) |
| | pano_offset = get_pano_offset(info, cfg.do_legacy_pano_offset) |
| | elif camera.projection_type in ["fisheye", "perspective"]: |
| | if camera.projection_type == "fisheye": |
| | camera_new = perspective_camera_from_fisheye(camera) |
| | else: |
| | camera_new = perspective_camera_from_perspective(camera) |
| | camera_new = scale_camera(camera_new, max_size) |
| | camera_new.id = camera.id + "_undistorted" |
| | undistorter = CameraUndistorter(camera, camera_new) |
| | else: |
| | raise NotImplementedError(camera.projection_type) |
| |
|
| | shots_undist, images_undist = undistort_shot( |
| | image_orig, shot, undistorter, pano_offset |
| | ) |
| | for shot, image in zip(shots_undist, images_undist): |
| | cv2.imwrite(str(output_dir / f"{shot.id}.jpg"), image) |
| |
|
| | return shots_undist |
| |
|
| |
|
| | def pack_shot_dict(shot: Shot, info: dict) -> dict: |
| | latlong = info["computed_geometry.coordinates"][::-1] |
| | latlong_gps = info["geometry.coordinates"][::-1] |
| | w_p_c = shot.pose.get_origin() |
| | w_r_c = shot.pose.get_R_cam_to_world() |
| | rpy = decompose_rotmat(w_r_c) |
| | return dict( |
| | camera_id=shot.camera.id, |
| | latlong=latlong, |
| | t_c2w=w_p_c, |
| | R_c2w=w_r_c, |
| | roll_pitch_yaw=rpy, |
| | capture_time=info["captured_at"], |
| | gps_position=np.r_[latlong_gps, info["altitude"]], |
| | compass_angle=info["compass_angle"], |
| | chunk_id=int(info["sfm_cluster.id"]), |
| | ) |
| |
|
| |
|
| | def pack_camera_dict(camera: Camera) -> dict: |
| | assert camera.projection_type == "perspective" |
| | K = camera.get_K_in_pixel_coordinates(camera.width, camera.height) |
| | return dict( |
| | id=camera.id, |
| | model="PINHOLE", |
| | width=camera.width, |
| | height=camera.height, |
| | params=K[[0, 1, 0, 1], [0, 1, 2, 2]], |
| | ) |
| |
|
| |
|
| | def process_sequence( |
| | image_ids: List[int], |
| | image_infos: dict, |
| | projection: Projection, |
| | cfg: DictConfig, |
| | raw_image_dir: Path, |
| | out_image_dir: Path, |
| | ): |
| | shots = [] |
| | dump = {} |
| | processed_ids = list() |
| | if len(image_ids) == 0: |
| | return dump, processed_ids |
| |
|
| | image_ids = sorted(image_ids, key=lambda i: image_infos[i]["captured_at"]) |
| | for i in image_ids: |
| | _, shot = opensfm_shot_from_info(image_infos[i], projection) |
| | shots.append(shot) |
| | shot_idxs = keyframe_selection(shots, min_dist=cfg.min_dist_between_keyframes) |
| | shots = [shots[i] for i in shot_idxs] |
| |
|
| | shots_out = thread_map( |
| | lambda shot: process_shot( |
| | shot, |
| | image_infos[shot.id], |
| | raw_image_dir / image_filename.format(image_id=shot.id), |
| | out_image_dir, |
| | cfg, |
| | ), |
| | shots, |
| | disable=True, |
| | ) |
| | shots_out = [s for s in shots_out if s is not None] |
| | shots_out = [(i, s) for i, ss in enumerate(shots_out) for s in ss if ss is not None] |
| |
|
| | for index, shot in shots_out: |
| | i, suffix = shot.id.rsplit("_", 1) |
| | processed_ids.append(i) |
| | info = image_infos[i] |
| | seq_id = info["sequence"] |
| | is_pano = not suffix.endswith("undistorted") |
| | if is_pano: |
| | seq_id += f"_{suffix}" |
| | if seq_id not in dump: |
| | dump[seq_id] = dict(views={}, cameras={}) |
| |
|
| | view = pack_shot_dict(shot, info) |
| | view["index"] = index |
| | dump[seq_id]["views"][shot.id] = view |
| | dump[seq_id]["cameras"][shot.camera.id] = pack_camera_dict(shot.camera) |
| | return dump, processed_ids |