| |
|
|
| import json |
| from pathlib import Path |
|
|
| import numpy as np |
| import httpx |
| import asyncio |
| from aiolimiter import AsyncLimiter |
| import tqdm |
| import requests |
| import mercantile |
| import geojson |
| import turfpy.measurement |
| from vt2geojson.tools import vt_bytes_to_geojson |
|
|
|
|
| from opensfm.pygeometry import Camera, Pose |
| from opensfm.pymap import Shot |
|
|
| from .. import logger |
| from .geo import Projection |
|
|
|
|
| semaphore = asyncio.Semaphore(100) |
| image_filename = "{image_id}.jpg" |
| info_filename = "{image_id}.json" |
|
|
|
|
| class MapillaryDownloader: |
| image_fields = ( |
| "id", |
| "height", |
| "width", |
| "camera_parameters", |
| "camera_type", |
| "captured_at", |
| "compass_angle", |
| "geometry", |
| "altitude", |
| "computed_compass_angle", |
| "computed_geometry", |
| "computed_altitude", |
| "computed_rotation", |
| "thumb_2048_url", |
| "thumb_original_url", |
| "sequence", |
| "sfm_cluster", |
| "creator", |
| "make", |
| "model", |
| "is_pano", |
| "quality_score", |
| "exif_orientation" |
| ) |
| image_info_url = ( |
| "https://graph.mapillary.com/{image_id}?access_token={token}&fields={fields}" |
| ) |
| seq_info_url = "https://graph.mapillary.com/image_ids?access_token={token}&sequence_id={seq_id}" |
| tile_info_url = "https://tiles.mapillary.com/maps/vtp/mly1_public/2/{z}/{x}/{y}?access_token={token}" |
| max_requests_per_minute = 50_000 |
|
|
| def __init__(self, token: str): |
| self.token = token |
| self.client = httpx.AsyncClient( |
| transport=httpx.AsyncHTTPTransport(retries=20), timeout=600 |
| ) |
| self.limiter = AsyncLimiter(self.max_requests_per_minute // 2, time_period=60) |
|
|
| async def call_api(self, url: str): |
| async with self.limiter: |
| r = await self.client.get(url) |
| if not r.is_success: |
| logger.error("Error in API call: %s", r.text) |
| return r |
|
|
|
|
| async def get_tile_image_points(self, tile): |
| url = self.tile_info_url.format( |
| x=tile.x, |
| y=tile.y, |
| z=tile.z, |
| token=self.token |
| ) |
| try : |
| r = await self.call_api(url) |
| if r.is_success: |
| geo_d = vt_bytes_to_geojson( |
| b_content=r._content, |
| x=tile.x, |
| y=tile.y, |
| z=tile.z, |
| layer="image", |
| ) |
| d = geo_d["features"] |
| return tile, d |
| except Exception as e: |
| logger.error(f"{type(e).__name__}: {e}") |
| return tile, None |
|
|
| async def get_tiles_image_points(self, tiles, retries=3): |
| tile_to_images = {} |
| tasks = [self.get_tile_image_points(t) for t in tiles] |
| for i in range(retries): |
| failed_tiles = list() |
| for task in tqdm.asyncio.tqdm.as_completed(tasks): |
| tile, image_ids = await task |
| if image_ids is not None: |
| tile_to_images[f"z_{tile.z}_x{tile.x}_y{tile.y}"] = image_ids |
| else: |
| logger.error(f"Error when retrieving tile z_{tile.z}_x{tile.x}_y{tile.y}. Image_ids is None. Skipping.") |
| failed_tiles.append(tile) |
| if len(failed_tiles) == 0: |
| break |
| else: |
| if i == retries-1: |
| logger.error(f"Failed to retrieve {len(failed_tiles)} tiles in attempt {i}. Maxed out retries. Skipping those tiles.") |
| else: |
| logger.error(f"Failed to retrieve {len(failed_tiles)} tiles in attempt {i}. Trying again..") |
| tasks = [self.get_tile_image_points(t) for t in failed_tiles] |
| return tile_to_images |
|
|
|
|
| async def get_image_info(self, image_id: int): |
| url = self.image_info_url.format( |
| image_id=image_id, |
| token=self.token, |
| fields=",".join(self.image_fields), |
| ) |
| r = await self.call_api(url) |
| if r.is_success: |
| return json.loads(r.text) |
|
|
| async def get_sequence_info(self, seq_id: str): |
| url = self.seq_info_url.format(seq_id=seq_id, token=self.token) |
| r = await self.call_api(url) |
| if r.is_success: |
| return json.loads(r.text) |
|
|
| async def download_image_pixels(self, url: str, path: Path): |
| r = await self.call_api(url) |
| if r.is_success: |
| with open(path, "wb") as fid: |
| fid.write(r.content) |
| return r.is_success |
|
|
| async def get_image_info_cached(self, image_id: int, path: Path): |
| if path.exists(): |
| info = json.loads(path.read_text()) |
| else: |
| info = await self.get_image_info(image_id) |
| path.write_text(json.dumps(info)) |
| return info |
|
|
| async def download_image_pixels_cached(self, url: str, path: Path): |
| if path.exists(): |
| return True |
| else: |
| return await self.download_image_pixels(url, path) |
|
|
|
|
| async def fetch_images_in_sequence(i, downloader): |
| async with semaphore: |
| info = await downloader.get_sequence_info(i) |
| image_ids = [int(d["id"]) for d in info["data"]] |
| return i, image_ids |
|
|
|
|
| async def fetch_images_in_sequences(sequence_ids, downloader): |
| seq_to_images_ids = {} |
| tasks = [fetch_images_in_sequence(i, downloader) for i in sequence_ids] |
| for task in tqdm.asyncio.tqdm.as_completed(tasks): |
| i, image_ids = await task |
| seq_to_images_ids[i] = image_ids |
| return seq_to_images_ids |
|
|
|
|
| async def fetch_image_info(i, downloader, dir_): |
| async with semaphore: |
| path = dir_ / info_filename.format(image_id=i) |
| |
| info = await downloader.get_image_info(i) |
| return i, info |
|
|
|
|
| async def fetch_image_infos(image_ids, downloader, dir_): |
| infos = {} |
| num_fail = 0 |
| tasks = [fetch_image_info(i, downloader, dir_) for i in image_ids] |
| for task in tqdm.asyncio.tqdm.as_completed(tasks): |
| i, info = await task |
| if info is None: |
| num_fail += 1 |
| else: |
| infos[i] = info |
| return infos, num_fail |
|
|
|
|
| async def fetch_image_pixels(i, url, downloader, dir_, overwrite=False): |
| async with semaphore: |
| path = dir_ / image_filename.format(image_id=i) |
| if overwrite: |
| path.unlink(missing_ok=True) |
| success = await downloader.download_image_pixels_cached(url, path) |
| return i, success |
|
|
|
|
| async def fetch_images_pixels(image_urls, downloader, dir_): |
| num_fail = 0 |
| tasks = [fetch_image_pixels(*id_url, downloader, dir_) for id_url in image_urls] |
| for task in tqdm.asyncio.tqdm.as_completed(tasks): |
| i, success = await task |
| num_fail += not success |
| return num_fail |
|
|
|
|
| def opensfm_camera_from_info(info: dict) -> Camera: |
| cam_type = info["camera_type"] |
| if cam_type == "perspective": |
| camera = Camera.create_perspective(*info["camera_parameters"]) |
| elif cam_type == "fisheye": |
| camera = Camera.create_fisheye(*info["camera_parameters"]) |
| elif Camera.is_panorama(cam_type): |
| camera = Camera.create_spherical() |
| else: |
| raise ValueError(cam_type) |
| camera.width = info["width"] |
| camera.height = info["height"] |
| camera.id = info["id"] |
| return camera |
|
|
|
|
| def opensfm_shot_from_info(info: dict, projection: Projection) -> Shot: |
| latlong = info["computed_geometry.coordinates"][::-1] |
| alt = info["computed_altitude"] |
| xyz = projection.project(np.array([*latlong, alt]), return_z=True) |
| c_rotvec_w = np.array(info["computed_rotation"]) |
| pose = Pose() |
| pose.set_from_cam_to_world(-c_rotvec_w, xyz) |
| camera = opensfm_camera_from_info(info) |
| return latlong, Shot(info["id"], camera, pose) |
|
|
|
|
| def get_city_boundary(city, state=None, country=None, fetch_shape=False): |
| |
| base_url = "https://nominatim.openstreetmap.org/search" |
| params = { |
| 'city': city, |
| 'state': state, |
| 'country': country, |
| 'format': 'json', |
| 'limit': 1, |
| 'polygon_geojson': 1 if fetch_shape else 0 |
| } |
|
|
| |
| |
| headers = { |
| 'User-Agent': f'mapperceptionnet_{city}_{state}' |
| } |
| response = requests.get(base_url, params=params, headers=headers) |
|
|
| if response.status_code != 200: |
| logger.error(f"Nominatim error when fetching boundary data for {city}, {state}.\n" |
| f"Status code: {response.status_code}. Content: {response.content}") |
| return None |
| |
| data = response.json() |
|
|
| if data is None: |
| logger.warn(f"No data returned by Nominatim for {city}, {state}") |
| return None |
| |
| |
| bbox_data = data[0]['boundingbox'] |
| bbox = { |
| 'west': float(bbox_data[2]), |
| 'south': float(bbox_data[0]), |
| 'east': float(bbox_data[3]), |
| 'north': float(bbox_data[1]) |
| } |
|
|
| if fetch_shape: |
| |
| boundary_geojson = data[0]['geojson'] |
| boundary_geojson = { |
| "type": "FeatureCollection", |
| "features": [ |
| {"type": "Feature", |
| "properties": {}, |
| "geometry": boundary_geojson}] |
| } |
| return bbox, boundary_geojson |
| else: |
| return bbox |
|
|
|
|
| def get_tiles_from_boundary(boundary_info, zoom=14): |
| if boundary_info["bound_type"] == "auto_shape": |
| |
| geojson_shape = boundary_info["shape"] |
| |
| |
| |
| coords = geojson_shape["features"][0]["geometry"]["coordinates"] |
| try: |
| polygon = geojson.Polygon(coords) |
| coordinates = turfpy.measurement.bbox(polygon) |
| except: |
| logger.warn(f"Boundary is defined by {len(coords)} polygons. Choosing first polygon blindly") |
| polygon = geojson.Polygon(coords[0]) |
| coordinates = turfpy.measurement.bbox(polygon) |
| |
| coordinates = dict(zip(["west", "south", "east", "north"], coordinates)) |
| else: |
| coordinates = boundary_info["bbox"] |
| |
| tiles = list( |
| mercantile.tiles( |
| **coordinates, |
| zooms=zoom, |
| ) |
| ) |
|
|
| return tiles |