| """ |
| |
| Example usage: |
| python3.9 -m mapper.data.debug.calc_stats -d /ocean/projects/cis220039p/shared/map_perception/dataset_v0 |
| """ |
| import datetime |
| from datetime import datetime, timezone, timedelta |
| import time |
| import argparse |
| import os |
| from pathlib import Path |
| import json |
|
|
| from astral import LocationInfo |
| from astral.sun import sun |
| from timezonefinder import TimezoneFinder |
|
|
| import numpy as np |
| import pandas as pd |
| import geopandas as gpd |
| from pyproj.transformer import Transformer |
| from matplotlib import pyplot as plt |
| from matplotlib.backends.backend_pdf import PdfPages |
| import tqdm |
|
|
| from ..fpv import filters |
| from .. import logger |
|
|
|
|
| def is_daytime(timestamp, latitude, longitude): |
| |
| tz_str = TimezoneFinder().timezone_at(lng=longitude, lat=latitude) |
| location = LocationInfo(name="", region="", timezone=tz_str, |
| latitude=latitude, longitude=longitude) |
| |
| |
| dt = datetime.fromtimestamp(timestamp, tz=timezone.utc) |
| |
| |
| |
| |
| d0 = (dt - timedelta(days=1)).date() |
| d1 = dt.date() |
| d2 = (dt + timedelta(days=1)).date() |
|
|
| |
| times = list() |
| for d in [d0, d1, d2]: |
| s = sun(location.observer, date=d) |
| sunrise = s['sunrise'] |
| sunset = s['sunset'] |
| times.append((sunrise, "sunrise")) |
| times.append((sunset, 'sunset')) |
| |
| |
| |
| times = sorted(times, key=lambda x: x[0]) |
| assert times[-1][0] > dt > times[0][0] |
|
|
| for i in range(1, len(times)): |
| if dt < times[i][0]: |
| prev_event = times[i-1][1] |
| break |
| |
| return prev_event == "sunrise" |
|
|
| def calculate_occupancy_map(df: pd.DataFrame, bev_meter_coverage=112, meters_per_pixel=112): |
| """ |
| Args: |
| bev_meter_coverage: How much did the BEVs in the dataframe cover in meters |
| meters_per_pixel: At what resolution should we initialize the occupancy map. |
| This need not be the same resolution as the BEV. That would be unnecessarilly slow but most accurate. |
| """ |
| |
| gdf = gpd.GeoDataFrame(df, |
| geometry=gpd.points_from_xy( |
| df['computed_geometry.long'], |
| df['computed_geometry.lat']), |
| crs=4326) |
|
|
| utm_crs = gdf.estimate_utm_crs() |
| gdf_utm = gdf.to_crs(utm_crs) |
| left = gdf_utm.geometry.x.min() - bev_meter_coverage |
| right = gdf_utm.geometry.x.max() + bev_meter_coverage |
| bottom = gdf_utm.geometry.y.min() - bev_meter_coverage |
| top = gdf_utm.geometry.y.max() + bev_meter_coverage |
|
|
| width = right - left |
| height = top - bottom |
| width_pixels = int(width // meters_per_pixel) |
| height_pixels = int(height // meters_per_pixel) |
| if bev_meter_coverage % meters_per_pixel != 0: |
| logger.warn(f"bev_meter_coverage {bev_meter_coverage} is not divisble by meters_per_pixel " |
| f"{meters_per_pixel}. Occupancy may be overestimated.") |
|
|
| bev_pixels = int(np.ceil(bev_meter_coverage / meters_per_pixel)) |
|
|
| logger.info(f"Initializing {height_pixels}x{width_pixels} occupancy map. Using {bev_pixels}x{bev_pixels} pixels for each BEV.") |
| map = np.zeros((height_pixels, width_pixels), dtype=bool) |
| |
| for row in gdf_utm.itertuples(): |
| utm_x = row.geometry.x |
| utm_y = row.geometry.y |
| img_x = int((utm_x - left) // meters_per_pixel) |
| img_y = int((utm_y - bottom) // meters_per_pixel) |
|
|
| bev_pixels_left = bev_pixels // 2 |
| bev_pixels_right = bev_pixels - bev_pixels_left |
| map[img_y - bev_pixels_left: img_y + bev_pixels_right, |
| img_x - bev_pixels_left: img_x + bev_pixels_right] = True |
| |
| return map |
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--dataset_dir", '-d', type=str, required=True, help="Dataset directory") |
| parser.add_argument("--locations", '-l', type=str, default="all", |
| help="Location names in CSV format. Set to 'all' to traverse all locations.") |
| parser.add_argument("--plot", action="store_true", help="Store plots per location in PDFs") |
| parser.add_argument("--output", "-o", default=None, type=str, help="output json file to store statistics") |
| args = parser.parse_args() |
|
|
| locations = list() |
| if args.locations.lower() == "all": |
| locations = os.listdir(args.dataset_dir) |
| locations = [l for l in locations if os.path.isdir(os.path.join(args.dataset_dir, l))] |
| else: |
| locations = args.locations.split(",") |
|
|
| logger.info(f"Parsing {len(locations)} locations..") |
|
|
| all_locs_stats = dict() |
|
|
| for location in tqdm.tqdm(locations): |
| dataset_dir = Path(args.dataset_dir) |
| location_dir = dataset_dir / location |
| bev_dir = location_dir / "bev_raw" |
| semantic_mask_dir = location_dir / "semantic_masks" |
| osm_cache_dir = location_dir / "osm_cache" |
|
|
| pq_name = 'image_metadata_filtered_processed.parquet' |
| df = pd.read_parquet(location_dir / pq_name) |
|
|
| df = df[df["computed_geometry.lat"].notna()] |
| df = df[df["computed_geometry.long"].notna()] |
|
|
| logger.info(f"Loaded {df.shape[0]} image metadata from {location}") |
|
|
| |
| tqdm.tqdm.pandas() |
|
|
| df["loc_descrip"] = filters.haversine_np( |
| lon1=df["geometry.long"], lat1=df["geometry.lat"], |
| lon2=df["computed_geometry.long"], lat2=df["computed_geometry.lat"] |
| ) |
|
|
| df["angle_descrip"] = filters.angle_dist( |
| df["compass_angle"], |
| df["computed_compass_angle"] |
| ) |
|
|
| |
| |
| |
| |
| |
|
|
| meters_per_pixel = 7 |
| map = calculate_occupancy_map(df, bev_meter_coverage=112, |
| meters_per_pixel=meters_per_pixel) |
|
|
| |
| loc_stats = dict() |
| loc_stats["num_images"] = len(df) |
| loc_stats["area_covered_km2"] = np.sum(map) * meters_per_pixel ** 2 * 1e-6 |
| loc_stats["camera_types"] = set(df["camera_type"].unique()) |
| loc_stats["camera_makes"] = set(df["make"].unique()) |
| loc_stats["camera_model"] = set(df["model"].unique()) |
|
|
| all_locs_stats[location] = loc_stats |
|
|
| |
| if args.plot: |
| with PdfPages(location_dir / "stats.pdf") as pdf: |
| plt.figure() |
| plt.imshow(map) |
| plt.title(f"{location} occupancy map") |
| pdf.savefig() |
| plt.close() |
| for k in ["make", "model", "camera_type", "loc_descrip", |
| "angle_descrip"]: |
| plt.figure() |
| df[k].hist() |
| plt.title(k) |
| plt.xlabel(k) |
| plt.xticks(rotation=90) |
| plt.ylabel("Count") |
| plt.tight_layout() |
| pdf.savefig() |
| plt.close() |
|
|
| |
| aggregated_stats = dict() |
| for loc, loc_stats in all_locs_stats.items(): |
| for k,v in loc_stats.items(): |
| if isinstance(v, float) or isinstance(v, int): |
| if k not in aggregated_stats.keys(): |
| aggregated_stats[k] = v |
| else: |
| aggregated_stats[k] += v |
| elif isinstance(v, set): |
| if k not in aggregated_stats.keys(): |
| aggregated_stats[k] = v |
| else: |
| aggregated_stats[k] = aggregated_stats[k].union(v) |
| aggregated_stats[f"{k}_count"] = len(aggregated_stats[k]) |
| else: |
| raise Exception(f"{v} is not supported !") |
|
|
| all_locs_stats["aggregated"] = aggregated_stats |
|
|
| print(all_locs_stats) |
|
|
| |
| for loc, loc_stats in all_locs_stats.items(): |
| for k,v in loc_stats.items(): |
| if isinstance(v, set): |
| loc_stats[k] = list(v) |
| |
| if args.output: |
| with open(args.output, "w") as f: |
| json.dump(all_locs_stats, f, indent=2) |