| | """python3.9 -m mia.fpv.get_fpv --cfg mia/conf/example.yaml""" |
| |
|
| | import argparse |
| | import itertools |
| | import traceback |
| | from functools import partial |
| | from typing import Dict |
| | from pathlib import Path |
| | import tracemalloc |
| | import copy |
| | import json |
| |
|
| | import numpy as np |
| | import asyncio |
| | from tqdm import tqdm |
| | from omegaconf import OmegaConf |
| | import pandas as pd |
| |
|
| | from .. import logger |
| | from .geo import Projection |
| |
|
| | from .download import ( |
| | MapillaryDownloader, |
| | fetch_image_infos, |
| | fetch_images_pixels, |
| | get_city_boundary, |
| | get_tiles_from_boundary, |
| | ) |
| | from .prepare import process_sequence, default_cfg |
| | from .filters import in_shape_filter, FilterPipeline |
| |
|
| | class JSONEncoder(json.JSONEncoder): |
| | def default(self, obj): |
| | if isinstance(obj, np.ndarray): |
| | return obj.tolist() |
| | elif isinstance(obj, np.generic): |
| | return obj.item() |
| | return json.JSONEncoder.default(self, obj) |
| |
|
| | def write_json(path, data): |
| | with open(path, "w") as f: |
| | json.dump(data, f, cls=JSONEncoder) |
| |
|
| | def get_token(token: str) -> str: |
| | if Path(token).is_file(): |
| | logger.info(f"Reading token from file {token}") |
| | with open(token, 'r') as file: |
| | token = file.read().strip() |
| |
|
| | if not token.startswith("MLY"): |
| | logger.fatal(f"The token '{token}' is invalid") |
| | exit(1) |
| | else: |
| | logger.info(f"Using token {token}") |
| | return token |
| |
|
| | def fetch_city_boundaries(cities: list): |
| | """ |
| | Args: |
| | cities: List of dictionaries describing the city/region to fetch in the fpv.yaml format. |
| | """ |
| | data = [] |
| | pbar = tqdm(cities) |
| | for loc_info in pbar: |
| | loc_fmt = loc_info["name"] |
| |
|
| | if "state" in loc_info: |
| | loc_fmt = f"{loc_fmt}, {loc_info['state']}" |
| | else: |
| | loc_info["state"] = "" |
| |
|
| | if "country" in loc_info: |
| | loc_fmt = f"{loc_fmt}, {loc_info['country']}" |
| | else: |
| | loc_info["country"] = "" |
| | |
| | pbar.set_description(f"Getting boundary for {loc_fmt}") |
| | entry = copy.copy(dict(loc_info)) |
| |
|
| | get_city_boundary_ = partial(get_city_boundary, loc_info["name"], loc_info["state"], loc_info["country"]) |
| | if "bound_type" not in loc_info: |
| | assert "sequence_ids" in loc_info |
| | raise NotImplementedError() |
| | elif loc_info["bound_type"] == "custom_bbox": |
| | assert "custom_bbox" in loc_info |
| | entry["bbox"] = dict(zip(["west", "south", "east", "north"], |
| | [float(x) for x in loc_info["custom_bbox"].split(",")])) |
| | elif loc_info["bound_type"] == "auto_shape": |
| | entry["bbox"], entry["shape"] = get_city_boundary_(fetch_shape=True) |
| | elif loc_info["bound_type"] == "auto_bbox": |
| | entry["bbox"] = get_city_boundary_(fetch_shape=False) |
| | elif loc_info["bound_type"] == "custom_size": |
| | assert "custom_size" in loc_info |
| | custom_size = loc_info["custom_size"] |
| | bbox = get_city_boundary_(fetch_shape=False) |
| | |
| | |
| | bbox_center = [(bbox['west'] + bbox['east'])/2, (bbox['south'] + bbox['north'])/2] |
| | bbox['west'] = bbox_center[0] - custom_size / (111.32*np.cos(np.deg2rad(bbox_center[1]))) |
| | bbox['east'] = bbox_center[0] + custom_size / (111.32*np.cos(np.deg2rad(bbox_center[1]))) |
| | bbox['south'] = bbox_center[1] - custom_size / 111.32 |
| | bbox['north'] = bbox_center[1] + custom_size / 111.32 |
| | entry["bbox"] = bbox |
| | entry["custom_size"] = custom_size |
| | else: |
| | raise Exception(f"Unsupported bound_type type '{loc_info['bound_type']}'") |
| |
|
| | data.append(entry) |
| | return data |
| |
|
| | def geojson_feature_list_to_pandas(feature_list, split_coords=True): |
| | t = pd.json_normalize(feature_list) |
| | cols_to_drop = ["type", "geometry.type", "properties.organization_id", "computed_geometry.type"] |
| | if split_coords: |
| | t[['geometry.long','geometry.lat']] = pd.DataFrame(t["geometry.coordinates"].tolist(), index=t.index) |
| | |
| | if "computed_geometry.coordinates" in t.columns: |
| | t["computed_geometry.long"] = t["computed_geometry.coordinates"].map(lambda x: (x if isinstance(x, float) else x[0]) ) |
| | t["computed_geometry.lat"] = t["computed_geometry.coordinates"].map(lambda x: (x if isinstance(x, float) else x[1]) ) |
| |
|
| | t.drop(columns=cols_to_drop, inplace=True, errors="ignore") |
| | t.columns = t.columns.str.removeprefix('properties.') |
| | t["id"] = t["id"].astype(str) |
| | return t |
| |
|
| | def parse_image_points_json_data(rd: dict, combine=True) -> pd.DataFrame: |
| | """ |
| | Parse the json in to a pandas dataframe |
| | """ |
| | df_dict = dict() |
| | for tile, feature_list in tqdm(rd.items(), total=len(rd)): |
| | if len(feature_list) == 0: |
| | continue |
| | df_dict[tile] = geojson_feature_list_to_pandas(feature_list) |
| | |
| | if combine: |
| | logger.info(f"Joining all dataframes into one.") |
| | return pd.concat(df_dict.values()) |
| | else: |
| | return df_dict |
| |
|
| | def log_memory_usage(): |
| | current, peak = tracemalloc.get_traced_memory() |
| | current_gb = current / 10**9 |
| | peak_gb = peak / 10**9 |
| | logger.info(f"Current memory: {current_gb:.3f} GB; Peak was {peak_gb:.3f} GB") |
| |
|
| | def main(args, cfgs): |
| | pipeline = FilterPipeline.load_from_yaml(cfgs.fpv_options.filter_pipeline_cfg) |
| |
|
| | |
| | tracemalloc.start() |
| | token = get_token(args.token) |
| | downloader = MapillaryDownloader(token) |
| | loop = asyncio.get_event_loop() |
| |
|
| | |
| | dataset_dir = Path(cfgs.dataset_dir) |
| | dataset_dir.mkdir(exist_ok=True, parents=True) |
| |
|
| | |
| | logger.info(f"Auto fetching boundaries for cities if needed.") |
| | cities_bounds_info = fetch_city_boundaries(cfgs.cities) |
| |
|
| | log_memory_usage() |
| |
|
| | |
| | for city_boundary_info in cities_bounds_info: |
| | |
| | |
| | df = None |
| | df_meta = None |
| | df_meta_filtered = None |
| | df_meta_filtered_processed = None |
| |
|
| | logger.info(f"Processing {city_boundary_info['name']}") |
| | |
| | location_name = city_boundary_info['name'].lower().replace(" ", "_") |
| | location_dir = dataset_dir / location_name |
| | infos_dir = location_dir / "image_infos_chunked" |
| | raw_image_dir = location_dir / "images_raw" |
| | out_image_dir = location_dir / "images" |
| | for d in (infos_dir, raw_image_dir, out_image_dir, location_dir): |
| | if not d.exists(): |
| | logger.info(f"{d} does not exist. Creating directory {d}") |
| | d.mkdir(parents=True, exist_ok=True) |
| | write_json(location_dir / "boundary_info.json", city_boundary_info) |
| |
|
| | |
| | if cfgs.fpv_options.stages.get_image_points_from_tiles: |
| | logger.info(f"[{location_name}] Stage 1 (Downloading image IDs) ------------------") |
| | tiles = get_tiles_from_boundary(city_boundary_info) |
| | logger.info(f"[{location_name}] Found {len(tiles)} zoom-14 tiles for this boundary. Starting image point download") |
| | image_points_response = loop.run_until_complete( |
| | downloader.get_tiles_image_points(tiles) |
| | ) |
| | if image_points_response is None: |
| | logger.warn(f"[{location_name}] No image points found in boundary. Skipping city") |
| | continue |
| | write_json(location_dir / 'images_points_dump.json', image_points_response) |
| |
|
| | |
| | logger.info(f"[{location_name}] Parsing image point json data into dataframe") |
| | df = parse_image_points_json_data(image_points_response) |
| |
|
| | |
| | if city_boundary_info["bound_type"] == "auto_shape": |
| | old_count = df.shape[0] |
| | df = df[in_shape_filter(df, city_boundary_info["shape"])] |
| | new_count = df.shape[0] |
| | logger.info(f"[{location_name}] Keeping {new_count}/{old_count} ({new_count/old_count*100:.2f}%) " |
| | "points that are within city boundaries") |
| | df.to_parquet(location_dir / 'image_points.parquet') |
| |
|
| | |
| | if cfgs.fpv_options.stages.get_metadata: |
| | logger.info(f"[{location_name}] Stage 2 (Downloading Metadata) ------------------") |
| | if df is None: |
| | pq_name = 'image_points.parquet' |
| | df = pd.read_parquet(location_dir / pq_name) |
| | logger.info(f"[{location_name}] Loaded {df.shape[0]} image points from {pq_name}") |
| | log_memory_usage() |
| | |
| | |
| | chunk_size = cfgs.fpv_options.metadata_download_chunk_size |
| | num_split = int(np.ceil(df.shape[0] / chunk_size)) |
| | logger.info(f"[{location_name}] Splitting the {df.shape[0]} image points into {num_split} chunks of {chunk_size} image points each.") |
| |
|
| | |
| | num_downloaded_chunks = 0 |
| | num_of_chunks_in_dir = len(list(infos_dir.glob("image_metadata_chunk_*.parquet"))) |
| | df_meta_chunks = list() |
| | df_meta = pd.DataFrame() |
| | if infos_dir.exists() and num_of_chunks_in_dir > 0: |
| | logger.info(f"[{location_name}] Found {len(list(infos_dir.glob('image_metadata_chunk_*.parquet')))} existing metadata chunks.") |
| | downloaded_ids = [] |
| | num_downloaded_data_pts = 0 |
| | pbar = tqdm(infos_dir.glob("image_metadata_chunk_*.parquet"), total=num_of_chunks_in_dir) |
| | for chunk_fp in pbar: |
| | pbar.set_description(f"Loading {chunk_fp}") |
| | chunk_df = pd.read_parquet(chunk_fp) |
| | df_meta_chunks.append(chunk_df) |
| | num_downloaded_chunks += 1 |
| | num_downloaded_data_pts += len(chunk_df) |
| | log_memory_usage() |
| | |
| | num_pts_left = df.shape[0] - num_downloaded_data_pts |
| |
|
| | df_meta = pd.concat(df_meta_chunks) |
| | df_meta_chunks.clear() |
| | df = df[~df["id"].isin(df_meta["id"])] |
| | |
| | |
| | left_num_split = int(np.ceil(df.shape[0] / chunk_size)) |
| | |
| | |
| | if num_pts_left != len(df): |
| | raise ValueError(f"Number of points left {num_pts_left} does not match the number of points in the dataframe {len(df)}") |
| | |
| | if num_pts_left > 0: |
| | logger.info(f"Restarting metadata download with {num_pts_left} points, {left_num_split} chunks left to download.") |
| |
|
| | |
| | num_split = int(np.ceil(df.shape[0] / chunk_size)) |
| | groups = df.groupby(np.arange(len(df.index)) // chunk_size) |
| | |
| | for (frame_num, frame) in groups: |
| | frame_num = frame_num + num_downloaded_chunks |
| | logger.info(f"[{location_name}] Fetching metadata for {frame_num+1}/{num_split} chunk of {frame.shape[0]} image points.") |
| | image_ids = frame["id"] |
| | image_infos, num_fail = loop.run_until_complete( |
| | fetch_image_infos(image_ids, downloader, infos_dir) |
| | ) |
| | logger.info("%d failures (%.1f%%).", num_fail, 100 * num_fail / len(image_ids)) |
| | if num_fail == len(image_ids): |
| | logger.warn(f"[{location_name}] All images failed to be fetched. Skipping next steps") |
| | continue |
| | new_df_meta = geojson_feature_list_to_pandas(image_infos.values()) |
| | df_meta_chunks.append(new_df_meta) |
| | new_df_meta.to_parquet(infos_dir / f'image_metadata_chunk_{frame_num}.parquet') |
| | log_memory_usage() |
| | |
| | |
| | df_meta = pd.concat([df_meta] + df_meta_chunks) |
| | df_meta_chunks.clear() |
| |
|
| | |
| | df_meta["model"] = df_meta["model"].str.lower().str.replace(' ', '').str.replace('_', '') |
| | df_meta["make"] = df_meta["make"].str.lower().str.replace(' ', '').str.replace('_', '') |
| | df_meta.to_parquet(location_dir / 'image_metadata.parquet') |
| |
|
| | |
| | if cfgs.fpv_options.stages.run_filter: |
| | logger.info(f"[{location_name}] Stage 3 (Filtering) ------------------") |
| | |
| | if df_meta is None: |
| | pq_name = 'image_metadata.parquet' |
| | df_meta = pd.read_parquet(location_dir / pq_name) |
| | logger.info(f"[{location_name}] Loaded {df_meta.shape[0]} image metadata from {pq_name}") |
| | |
| | df_meta_filtered = pipeline(df_meta) |
| | df_meta_filtered.to_parquet(location_dir / f'image_metadata_filtered.parquet') |
| | if df_meta_filtered.shape[0] == 0: |
| | logger.warning(f"[{location_name}] No images to download. Moving on to next location.") |
| | continue |
| | else: |
| | logger.info(f"[{location_name}] {df_meta_filtered.shape[0]} images to download.") |
| |
|
| | |
| | if cfgs.fpv_options.stages.download_images: |
| | logger.info(f"[{location_name}] Stage 4 (Downloading Images) ------------------") |
| | if df_meta_filtered is None: |
| | pq_name = f'image_metadata_filtered.parquet' |
| | df_meta_filtered = pd.read_parquet(location_dir / pq_name) |
| | logger.info(f"[{location_name}] Loaded {df_meta_filtered.shape[0]} image metadata from {pq_name}") |
| | log_memory_usage() |
| | |
| | downloaded_image_fps = list(raw_image_dir.glob("*.jpg")) |
| | downloaded_image_ids = [fp.stem for fp in downloaded_image_fps] |
| | df_to_download = df_meta_filtered[~df_meta_filtered["id"].isin(downloaded_image_ids)] |
| | logger.info(f"[{location_name}] {len(downloaded_image_ids)} images already downloaded. {df_to_download.shape[0]} images left to download.") |
| | |
| | |
| | image_urls = list(df_to_download.set_index("id")["thumb_2048_url"].items()) |
| | if len(image_urls) > 0: |
| | num_fail = loop.run_until_complete( |
| | fetch_images_pixels(image_urls, downloader, raw_image_dir) |
| | ) |
| | logger.info("%d failures (%.1f%%).", num_fail, 100 * num_fail / len(image_urls)) |
| |
|
| | |
| | if cfgs.fpv_options.stages.to_process_sequence: |
| | logger.info(f"[{location_name}] Stage 5 (Sequence Processing) ------------------") |
| | if df_meta_filtered is None: |
| | pq_name = f'image_metadata_filtered.parquet' |
| | df_meta_filtered = pd.read_parquet(location_dir / pq_name) |
| | logger.info(f"[{location_name}] Loaded {df_meta_filtered.shape[0]} image metadata from {pq_name}") |
| | log_memory_usage() |
| | |
| | |
| | seq_to_image_ids = df_meta_filtered.groupby('sequence')['id'].agg(list).to_dict() |
| | lon_center = (city_boundary_info['bbox']['east'] + city_boundary_info['bbox']['west']) / 2 |
| | lat_center = (city_boundary_info['bbox']['north'] + city_boundary_info['bbox']['south']) / 2 |
| | projection = Projection(lat_center, lon_center, max_extent=50e3) |
| |
|
| | df_meta_filtered.index = df_meta_filtered["id"] |
| | image_infos = df_meta_filtered.to_dict(orient="index") |
| | process_sequence_args = default_cfg |
| |
|
| | log_memory_usage() |
| | |
| | |
| | dump = {} |
| | logger.info(f"[{location_name}] Processing downloaded sequences..") |
| |
|
| | processed_ids = list() |
| |
|
| | for seq_id, seq_image_ids in tqdm(seq_to_image_ids.items()): |
| | try: |
| | d, pi = process_sequence( |
| | seq_image_ids, |
| | image_infos, |
| | projection, |
| | process_sequence_args, |
| | raw_image_dir, |
| | out_image_dir, |
| | ) |
| | if d is None or pi is None: |
| | raise Exception("process_sequence returned None") |
| | processed_ids.append(pi) |
| | |
| | dump.update(d) |
| |
|
| | except Exception as e: |
| | logger.error(f"[{location_name}] Failed to process sequence {seq_id} skipping it. Error: {repr(e)}.") |
| | logger.error(traceback.format_exc()) |
| | |
| | write_json(location_dir / "dump.json", dump) |
| |
|
| | |
| | |
| | |
| | processed_ids = list(itertools.chain.from_iterable(processed_ids)) |
| | df_meta_filtered_processed = df_meta_filtered[ df_meta_filtered["id"].isin(processed_ids)] |
| | logger.info(f"[{location_name}] Final yield after processing is {df_meta_filtered_processed.shape[0]} images.") |
| | df_meta_filtered_processed.to_parquet(location_dir / f'image_metadata_filtered_processed.parquet') |
| |
|
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--cfg", type=str, default="mia/conf/example.yaml", help="Path to config yaml file.") |
| | parser.add_argument("--token", type=str, default='mapillary_key', help="Either a token string or a path to a file containing the token.") |
| | args = parser.parse_args() |
| |
|
| | cfgs = OmegaConf.load(args.cfg) |
| |
|
| | main(args, cfgs) |
| |
|