| |
| """ |
| Paper-style qualitative planning visualization (Figure-4-like): |
| |
| - Left: 6-camera mosaic (2x3) |
| - Right: BEV panel with planned trajectory |
| |
| Optionally overlays the planned trajectory onto CAM_FRONT using *fixed* nuScenes |
| camera calibration (loaded from v1.0-trainval sensor/calibrated_sensor tables). |
| This avoids scanning the 1.3GB sample_data.json. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import math |
| import mmap |
| import pickle |
| import sys |
| from pathlib import Path |
| from typing import Dict, Iterable, List, Optional, Tuple |
|
|
| import numpy as np |
|
|
| _REPO_ROOT = Path(__file__).resolve().parent.parent |
| if str(_REPO_ROOT) not in sys.path: |
| sys.path.insert(0, str(_REPO_ROOT)) |
|
|
|
|
| CAM_ORDER_DATAJSON = [ |
| "CAM_FRONT", |
| "CAM_FRONT_RIGHT", |
| "CAM_FRONT_LEFT", |
| "CAM_BACK", |
| "CAM_BACK_LEFT", |
| "CAM_BACK_RIGHT", |
| ] |
|
|
| CAM_ORDER_PAPER = [ |
| "CAM_FRONT_LEFT", |
| "CAM_FRONT", |
| "CAM_FRONT_RIGHT", |
| "CAM_BACK_LEFT", |
| "CAM_BACK", |
| "CAM_BACK_RIGHT", |
| ] |
|
|
| _IDX_REORDER = [2, 0, 1, 4, 3, 5] |
|
|
|
|
| def _load_json(path: Path): |
| with path.open("r", encoding="utf-8") as f: |
| return json.load(f) |
|
|
|
|
| def _load_pickle(path: Path): |
| with path.open("rb") as f: |
| return pickle.load(f) |
|
|
|
|
| def _extract_results_list_for_token(mm: "mmap.mmap", token: str) -> List[Dict]: |
| """ |
| Extract `results[token]` list from a nuScenes detection results JSON mmap. |
| |
| The file is a minified JSON like: |
| {"meta": {...}, "results": {"<token>": [ {...}, ... ], ...}} |
| """ |
| pat = (f"\"{token}\":").encode("utf-8") |
| idx = mm.find(pat) |
| if idx < 0: |
| return [] |
| j = idx + len(pat) |
| |
| while j < len(mm) and mm[j] in b" \t\r\n": |
| j += 1 |
| if j >= len(mm) or mm[j : j + 1] != b"[": |
| return [] |
|
|
| start = j |
| depth = 0 |
| in_str = False |
| esc = False |
| k = start |
| end = None |
| while k < len(mm): |
| c = mm[k] |
| if in_str: |
| if esc: |
| esc = False |
| elif c == 0x5C: |
| esc = True |
| elif c == 0x22: |
| in_str = False |
| else: |
| if c == 0x22: |
| in_str = True |
| elif c == 0x5B: |
| depth += 1 |
| elif c == 0x5D: |
| depth -= 1 |
| if depth == 0: |
| end = k + 1 |
| break |
| k += 1 |
|
|
| if end is None: |
| return [] |
| try: |
| return json.loads(mm[start:end].decode("utf-8")) |
| except Exception: |
| return [] |
|
|
|
|
| def _load_ego_pose_map(nuscenes_root: Path) -> Dict[str, Tuple[np.ndarray, np.ndarray]]: |
| """ |
| Map: sample_token -> (R_ego2global(3x3), t_ego2global(3,)) |
| """ |
| pkl = nuscenes_root / "nuscenes_infos_val.pkl" |
| if not pkl.exists(): |
| raise FileNotFoundError(f"Missing {pkl} (required to transform detector global boxes to ego frame).") |
| obj = _load_pickle(pkl) |
| infos = obj["infos"] if isinstance(obj, dict) and "infos" in obj else obj |
| out: Dict[str, Tuple[np.ndarray, np.ndarray]] = {} |
| for it in infos: |
| tok = str(it.get("token", "")) |
| if not tok: |
| continue |
| t = np.asarray(it.get("ego2global_translation", [0.0, 0.0, 0.0]), dtype=np.float64).reshape(3) |
| q = it.get("ego2global_rotation", [1.0, 0.0, 0.0, 0.0]) |
| if not (isinstance(q, (list, tuple)) and len(q) == 4): |
| continue |
| R = _quat_to_rotmat(float(q[0]), float(q[1]), float(q[2]), float(q[3])) |
| out[tok] = (R, t) |
| return out |
|
|
|
|
| def _quat_to_rotmat(qw: float, qx: float, qy: float, qz: float) -> np.ndarray: |
| |
| n = math.sqrt(qw * qw + qx * qx + qy * qy + qz * qz) |
| if n < 1e-12: |
| return np.eye(3, dtype=np.float64) |
| qw, qx, qy, qz = qw / n, qx / n, qy / n, qz / n |
| |
| xx, yy, zz = qx * qx, qy * qy, qz * qz |
| xy, xz, yz = qx * qy, qx * qz, qy * qz |
| wx, wy, wz = qw * qx, qw * qy, qw * qz |
| return np.array( |
| [ |
| [1.0 - 2.0 * (yy + zz), 2.0 * (xy - wz), 2.0 * (xz + wy)], |
| [2.0 * (xy + wz), 1.0 - 2.0 * (xx + zz), 2.0 * (yz - wx)], |
| [2.0 * (xz - wy), 2.0 * (yz + wx), 1.0 - 2.0 * (xx + yy)], |
| ], |
| dtype=np.float64, |
| ) |
|
|
|
|
| def _load_fixed_cam_front_calib(nuscenes_root: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: |
| """ |
| Returns (R_c2e, t_c2e, K) for CAM_FRONT. |
| Uses the first calibrated_sensor record matching CAM_FRONT. |
| """ |
| meta_root = nuscenes_root / "v1.0-trainval" |
| sensor = _load_json(meta_root / "sensor.json") |
| calib = _load_json(meta_root / "calibrated_sensor.json") |
|
|
| |
| sensor_token = None |
| for rec in sensor: |
| if rec.get("channel") == "CAM_FRONT": |
| sensor_token = rec.get("token") |
| break |
| if sensor_token is None: |
| raise RuntimeError("CAM_FRONT not found in sensor.json") |
|
|
| calib_rec = None |
| for rec in calib: |
| if rec.get("sensor_token") == sensor_token: |
| calib_rec = rec |
| break |
| if calib_rec is None: |
| raise RuntimeError("No calibrated_sensor record found for CAM_FRONT") |
|
|
| t = np.asarray(calib_rec.get("translation", [0.0, 0.0, 0.0]), dtype=np.float64).reshape(3) |
| q = calib_rec.get("rotation", [1.0, 0.0, 0.0, 0.0]) |
| if not (isinstance(q, (list, tuple)) and len(q) == 4): |
| raise RuntimeError(f"Unexpected CAM_FRONT rotation quaternion: {q}") |
| R = _quat_to_rotmat(float(q[0]), float(q[1]), float(q[2]), float(q[3])) |
|
|
| K = np.asarray(calib_rec.get("camera_intrinsic", np.eye(3).tolist()), dtype=np.float64) |
| if K.shape != (3, 3): |
| raise RuntimeError(f"Unexpected CAM_FRONT camera_intrinsic shape: {K.shape}") |
| return R, t, K |
|
|
|
|
| def _load_fixed_cam_calibs(nuscenes_root: Path) -> Dict[str, Tuple[np.ndarray, np.ndarray, np.ndarray]]: |
| """ |
| Load fixed (R_c2e, t_c2e, K) for all nuScenes cameras by channel name. |
| """ |
| meta_root = nuscenes_root / "v1.0-trainval" |
| sensor = _load_json(meta_root / "sensor.json") |
| calib = _load_json(meta_root / "calibrated_sensor.json") |
|
|
| sensor_token_by_channel: Dict[str, str] = {} |
| for rec in sensor: |
| ch = rec.get("channel") |
| tok = rec.get("token") |
| if isinstance(ch, str) and isinstance(tok, str): |
| sensor_token_by_channel[ch] = tok |
|
|
| calib_by_sensor_token: Dict[str, Dict] = {} |
| for rec in calib: |
| st = rec.get("sensor_token") |
| if isinstance(st, str): |
| calib_by_sensor_token[st] = rec |
|
|
| out: Dict[str, Tuple[np.ndarray, np.ndarray, np.ndarray]] = {} |
| for ch, st in sensor_token_by_channel.items(): |
| rec = calib_by_sensor_token.get(st) |
| if not rec or "camera_intrinsic" not in rec: |
| continue |
| t = np.asarray(rec.get("translation", [0.0, 0.0, 0.0]), dtype=np.float64).reshape(3) |
| q = rec.get("rotation", [1.0, 0.0, 0.0, 0.0]) |
| if not (isinstance(q, (list, tuple)) and len(q) == 4): |
| continue |
| R = _quat_to_rotmat(float(q[0]), float(q[1]), float(q[2]), float(q[3])) |
| K = np.asarray(rec.get("camera_intrinsic", np.eye(3).tolist()), dtype=np.float64) |
| if K.shape != (3, 3): |
| continue |
| out[ch] = (R, t, K) |
| return out |
|
|
|
|
| def _paper_xy_to_nuscenes_ego_xyz(x_right: float, y_fwd: float, z_up: float = 0.0) -> np.ndarray: |
| |
| x_fwd = float(y_fwd) |
| y_left = float(-x_right) |
| return np.array([x_fwd, y_left, float(z_up)], dtype=np.float64) |
|
|
|
|
| def _project_ego_points_to_cam( |
| pts_ego_xyz: np.ndarray, |
| R_c2e: np.ndarray, |
| t_c2e: np.ndarray, |
| K: np.ndarray, |
| ) -> np.ndarray: |
| """ |
| Project nuScenes ego-frame 3D points to pixel coords in CAM_FRONT. |
| Returns (M,2) pixels for points with z_cam > 0. |
| """ |
| |
| R_e2c = R_c2e.T |
| pts_cam = (R_e2c @ (pts_ego_xyz - t_c2e[None, :]).T).T |
| z = pts_cam[:, 2] |
| keep = z > 1e-3 |
| pts_cam = pts_cam[keep] |
| if pts_cam.shape[0] == 0: |
| return np.zeros((0, 2), dtype=np.float64) |
| x = pts_cam[:, 0] / pts_cam[:, 2] |
| y = pts_cam[:, 1] / pts_cam[:, 2] |
| fx, fy = float(K[0, 0]), float(K[1, 1]) |
| cx, cy = float(K[0, 2]), float(K[1, 2]) |
| u = fx * x + cx |
| v = fy * y + cy |
| return np.stack([u, v], axis=1) |
|
|
|
|
| def _project_one_ego_point_to_cam( |
| pt_ego_xyz: np.ndarray, |
| R_c2e: np.ndarray, |
| t_c2e: np.ndarray, |
| K: np.ndarray, |
| ) -> Optional[Tuple[float, float, float]]: |
| """Return (u, v, z_cam) or None if behind camera.""" |
| pt_ego_xyz = np.asarray(pt_ego_xyz, dtype=np.float64).reshape(3) |
| R_e2c = R_c2e.T |
| pt_cam = R_e2c @ (pt_ego_xyz - t_c2e) |
| zc = float(pt_cam[2]) |
| if zc <= 1e-3: |
| return None |
| fx, fy = float(K[0, 0]), float(K[1, 1]) |
| cx, cy = float(K[0, 2]), float(K[1, 2]) |
| u = fx * float(pt_cam[0] / zc) + cx |
| v = fy * float(pt_cam[1] / zc) + cy |
| return float(u), float(v), zc |
|
|
|
|
| def _draw_ego_bev(ax, *, color: str = "k"): |
| import matplotlib.patches as patches |
|
|
| w, l = 1.85, 4.084 |
| rect = patches.Rectangle( |
| (-w / 2.0, -l / 2.0), |
| w, |
| l, |
| linewidth=1.2, |
| edgecolor=color, |
| facecolor="none", |
| zorder=10, |
| ) |
| ax.add_patch(rect) |
| ax.arrow(0.0, 0.0, 0.0, l * 0.8, head_width=0.6, head_length=0.8, fc=color, ec=color, zorder=11) |
| ax.text(0.2, -0.6, "EGO", color=color, fontsize=8, zorder=12) |
|
|
|
|
| def _box_corners_xy(cx: float, cy: float, w: float, l: float, yaw: float) -> np.ndarray: |
| """ |
| Oriented rectangle corners in XY (paper coords). |
| yaw is treated as radians in the same XY frame (best-effort). |
| |
| IMPORTANT: In our planning eval JSON, `yaw` behaves like a standard heading |
| angle measured from +X (right) axis (yaw-from-x). So: |
| - yaw = 0 => vehicle length points to +X (right) |
| - yaw = +pi/2 => vehicle length points to +Y (forward) |
| """ |
| c, s = math.cos(yaw), math.sin(yaw) |
| center = np.array([cx, cy], dtype=np.float64) |
|
|
| |
| d_len = np.array([c, s], dtype=np.float64) * (l / 2.0) |
| d_wid = np.array([-s, c], dtype=np.float64) * (w / 2.0) |
|
|
| corners = np.stack( |
| [ |
| center + d_len + d_wid, |
| center + d_len - d_wid, |
| center - d_len - d_wid, |
| center - d_len + d_wid, |
| ], |
| axis=0, |
| ) |
| return corners |
|
|
|
|
| def _title_case_command(cmd: str) -> str: |
| c = (cmd or "").strip().lower() |
| if c == "turn left": |
| return "Turn Left" |
| if c == "turn right": |
| return "Turn Right" |
| if c == "go straight": |
| return "Go Straight" |
| return cmd |
|
|
|
|
| def _short_cat(cat: str) -> str: |
| c = (cat or "").strip() |
| if not c: |
| return "obj" |
| tail = c.split(".")[-1] |
| mapping = { |
| "car": "car", |
| "truck": "truck", |
| "trailer": "trailer", |
| "bus": "bus", |
| "construction": "cveh", |
| "construction_vehicle": "cveh", |
| "pedestrian": "ped", |
| "trafficcone": "cone", |
| "traffic_cone": "cone", |
| "barrier": "barrier", |
| "motorcycle": "moto", |
| "bicycle": "bike", |
| } |
| return mapping.get(tail, tail[:10]) |
|
|
|
|
| def _select_default_samples(data_items: List[Dict]) -> List[str]: |
| |
| by_cmd: Dict[str, str] = {} |
| for it in data_items: |
| cmd = (it.get("ego_motion", {}) or {}).get("command") |
| if cmd and cmd not in by_cmd and it.get("id"): |
| by_cmd[str(cmd)] = str(it["id"]) |
| out = [] |
| if "turn right" in by_cmd: |
| out.append(by_cmd["turn right"]) |
| if "go straight" in by_cmd: |
| out.append(by_cmd["go straight"]) |
| if not out: |
| out = [str(it.get("id")) for it in data_items[:2] if it.get("id")] |
| return out[:2] |
|
|
|
|
| def parse_args(): |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--eval_json", type=str, default="work_dirs/eval_nocap_ep2_plan50.json") |
| ap.add_argument("--data_json", type=str, default="data/_eval_ep2_plan50.json") |
| ap.add_argument("--data_root", type=str, default="/home/guoyuanbo/autodl-tmp/data/nuscenes") |
| ap.add_argument("--out_png", type=str, default="work_dirs/atlas_planning_qualitative.png") |
| ap.add_argument("--sample_ids", type=str, nargs="*", default=None, help="Provide 1-2 sample ids; default picks turn right & go straight.") |
| ap.add_argument("--bev_xlim", type=float, nargs=2, default=[-30.0, 30.0]) |
| ap.add_argument("--bev_ylim", type=float, nargs=2, default=[-10.0, 55.0]) |
| ap.add_argument("--dpi", type=int, default=200) |
| ap.add_argument("--draw_gt_boxes", action="store_true", default=True) |
| ap.add_argument("--no_draw_gt_boxes", action="store_false", dest="draw_gt_boxes") |
| ap.add_argument("--overlay_on_front_cam", action="store_true", default=True) |
| ap.add_argument("--no_overlay_on_front_cam", action="store_false", dest="overlay_on_front_cam") |
| ap.add_argument("--highlight_front_visible_boxes", action="store_true", default=True) |
| ap.add_argument("--no_highlight_front_visible_boxes", action="store_false", dest="highlight_front_visible_boxes") |
| ap.add_argument("--max_front_labels", type=int, default=8, help="Max GT box labels shown on CAM_FRONT and BEV.") |
| ap.add_argument("--max_cam_labels", type=int, default=4, help="Max GT labels per camera view (all 6 views).") |
| ap.add_argument("--bev_only_visible", action="store_true", default=True, help="If set, BEV draws only FRONT* visible GT boxes.") |
| ap.add_argument("--bev_show_all", action="store_false", dest="bev_only_visible", help="Draw all GT boxes faintly in BEV (plus highlighted).") |
| ap.add_argument( |
| "--det_results_json", |
| type=str, |
| default="external/StreamPETR/val/work_dirs/streampetr_atlas_aligned/Tue_Feb_10_23_13_58_2026/pts_bbox/results_nusc.json", |
| help="nuScenes detection results JSON (StreamPETR).", |
| ) |
| ap.add_argument("--draw_det_pred_boxes", action="store_true", default=True) |
| ap.add_argument("--no_draw_det_pred_boxes", action="store_false", dest="draw_det_pred_boxes") |
| ap.add_argument("--det_score_thresh", type=float, default=0.3) |
| ap.add_argument("--det_max_boxes", type=int, default=30) |
| ap.add_argument("--det_label_topk", type=int, default=6, help="Label top-k detector boxes in BEV.") |
| return ap.parse_args() |
|
|
|
|
| def main(): |
| args = parse_args() |
| repo = _REPO_ROOT |
| eval_path = (repo / args.eval_json).resolve() |
| data_path = (repo / args.data_json).resolve() |
| nuscenes_root = Path(args.data_root).resolve() |
| out_png = (repo / args.out_png).resolve() |
|
|
| eval_obj = _load_json(eval_path) |
| data_items = _load_json(data_path) |
|
|
| |
| pred_text_by_id: Dict[str, str] = {} |
| for rec in eval_obj.get("predictions", []): |
| sid = str(rec.get("sample_id", "")) |
| if not sid: |
| continue |
| pred_text_by_id[sid] = str(rec.get("generated_text", "")) |
|
|
| item_by_id: Dict[str, Dict] = {str(it.get("id")): it for it in data_items if it.get("id")} |
|
|
| sample_ids = list(args.sample_ids) if args.sample_ids else _select_default_samples(data_items) |
| sample_ids = [sid for sid in sample_ids if sid in item_by_id] |
| if not sample_ids: |
| raise RuntimeError("No valid sample_ids found. Provide --sample_ids explicitly.") |
| if len(sample_ids) > 2: |
| sample_ids = sample_ids[:2] |
|
|
| |
| cam_calibs: Dict[str, Tuple[np.ndarray, np.ndarray, np.ndarray]] = {} |
| if args.overlay_on_front_cam: |
| cam_calibs = _load_fixed_cam_calibs(nuscenes_root) |
|
|
| from src.eval.metrics import parse_planning_output |
|
|
| |
| |
| det_mm = None |
| det_f = None |
| ego_pose_map = None |
| det_results_path = (repo / args.det_results_json).resolve() |
| if args.draw_det_pred_boxes and det_results_path.exists(): |
| try: |
| ego_pose_map = _load_ego_pose_map(nuscenes_root) |
| det_f = det_results_path.open("rb") |
| det_mm = mmap.mmap(det_f.fileno(), 0, access=mmap.ACCESS_READ) |
| except Exception: |
| det_mm = None |
| if det_f is not None: |
| det_f.close() |
| det_f = None |
|
|
| import matplotlib.pyplot as plt |
| from matplotlib.gridspec import GridSpec, GridSpecFromSubplotSpec |
|
|
| nrows = len(sample_ids) |
| fig = plt.figure(figsize=(12.5, 6.5 * nrows), dpi=args.dpi) |
| fig.suptitle("Atlas Planning — 6-Camera + BEV (Paper-style)", fontsize=16, y=0.99) |
|
|
| outer = GridSpec(nrows, 2, figure=fig, width_ratios=[3.2, 1.4], wspace=0.10, hspace=0.18) |
|
|
| from PIL import Image |
|
|
| for r, sid in enumerate(sample_ids): |
| item = item_by_id[sid] |
| pred_text = pred_text_by_id.get(sid, "") |
| plan = parse_planning_output(pred_text) if pred_text else None |
| pred_wps = (plan or {}).get("waypoints", []) if plan else [] |
| gt_wps = (item.get("ego_motion", {}) or {}).get("waypoints", []) or [] |
| cmd = (item.get("ego_motion", {}) or {}).get("command", "") |
|
|
| |
| rel_paths: List[str] = list(item.get("image_paths", []) or []) |
| if len(rel_paths) != 6: |
| raise RuntimeError(f"sample {sid}: expected 6 image_paths, got {len(rel_paths)}") |
| imgs = [] |
| for rp in rel_paths: |
| p = Path(rp) |
| if not p.is_absolute(): |
| p = nuscenes_root / rp |
| imgs.append(Image.open(p).convert("RGB")) |
| imgs = [imgs[i] for i in _IDX_REORDER] |
| front_w, front_h = imgs[1].size |
|
|
| |
| left = GridSpecFromSubplotSpec(2, 3, subplot_spec=outer[r, 0], wspace=0.02, hspace=0.06) |
| ax_imgs = [] |
| for i in range(6): |
| ax = fig.add_subplot(left[i // 3, i % 3]) |
| ax.imshow(imgs[i]) |
| |
| w_i, h_i = imgs[i].size |
| ax.set_xlim(0, w_i) |
| ax.set_ylim(h_i, 0) |
| ax.set_title(CAM_ORDER_PAPER[i], fontsize=9) |
| ax.axis("off") |
| ax_imgs.append(ax) |
|
|
| |
| boxes = item.get("gt_boxes_3d", []) or [] |
|
|
| def _visible_for_channel(channel: str, img_w: int, img_h: int): |
| if not (args.highlight_front_visible_boxes and args.overlay_on_front_cam): |
| return [] |
| if channel not in cam_calibs: |
| return [] |
| R_c2e, t_c2e, K = cam_calibs[channel] |
| out = [] |
| for bi, b in enumerate(boxes): |
| if not isinstance(b, dict) or "world_coords" not in b: |
| continue |
| wc = b.get("world_coords", [0.0, 0.0, 0.0]) |
| if not (isinstance(wc, (list, tuple)) and len(wc) >= 3): |
| continue |
| x_r, y_f, z_u = float(wc[0]), float(wc[1]), float(wc[2]) |
| pt_ego = _paper_xy_to_nuscenes_ego_xyz(x_r, y_f, z_u) |
| uv = _project_one_ego_point_to_cam(pt_ego, R_c2e, t_c2e, K) |
| if uv is None: |
| continue |
| u, v, _zc = uv |
| if 0.0 <= u <= float(img_w) and 0.0 <= v <= float(img_h): |
| d = float(math.hypot(x_r, y_f)) |
| out.append((d, bi, str(b.get("category", "")), x_r, y_f, z_u, u, v)) |
| out.sort(key=lambda t: t[0]) |
| return out |
|
|
| vis_by_ch = { |
| "CAM_FRONT_LEFT": _visible_for_channel("CAM_FRONT_LEFT", imgs[0].size[0], imgs[0].size[1]), |
| "CAM_FRONT": _visible_for_channel("CAM_FRONT", imgs[1].size[0], imgs[1].size[1]), |
| "CAM_FRONT_RIGHT": _visible_for_channel("CAM_FRONT_RIGHT", imgs[2].size[0], imgs[2].size[1]), |
| "CAM_BACK_LEFT": _visible_for_channel("CAM_BACK_LEFT", imgs[3].size[0], imgs[3].size[1]), |
| "CAM_BACK": _visible_for_channel("CAM_BACK", imgs[4].size[0], imgs[4].size[1]), |
| "CAM_BACK_RIGHT": _visible_for_channel("CAM_BACK_RIGHT", imgs[5].size[0], imgs[5].size[1]), |
| } |
| visible_union_all = [] |
| if any(vis_by_ch.get(ch) for ch in vis_by_ch.keys()): |
| seen = set() |
| for ch in ("CAM_FRONT_LEFT", "CAM_FRONT", "CAM_FRONT_RIGHT", "CAM_BACK_LEFT", "CAM_BACK", "CAM_BACK_RIGHT"): |
| for tup in vis_by_ch.get(ch, []): |
| bi = tup[1] |
| if bi in seen: |
| continue |
| seen.add(bi) |
| visible_union_all.append(tup) |
| visible_union_all.sort(key=lambda t: t[0]) |
| visible_union = visible_union_all[: max(int(args.max_front_labels), 0)] |
|
|
| |
| if args.overlay_on_front_cam and pred_wps and "CAM_FRONT" in cam_calibs: |
| R_c2e, t_c2e, K = cam_calibs["CAM_FRONT"] |
| ax_front = ax_imgs[1] |
| |
| pts_pred = np.array([_paper_xy_to_nuscenes_ego_xyz(x, y, 0.0) for x, y in pred_wps], dtype=np.float64) |
| uv_pred = _project_ego_points_to_cam(pts_pred, R_c2e, t_c2e, K) |
| if uv_pred.shape[0] >= 2: |
| ax_front.plot(uv_pred[:, 0], uv_pred[:, 1], "-o", color="#00C2FF", linewidth=2.0, markersize=4.0, alpha=0.95) |
| |
| if gt_wps: |
| pts_gt = np.array([_paper_xy_to_nuscenes_ego_xyz(x, y, 0.0) for x, y in gt_wps], dtype=np.float64) |
| uv_gt = _project_ego_points_to_cam(pts_gt, R_c2e, t_c2e, K) |
| if uv_gt.shape[0] >= 2: |
| ax_front.plot(uv_gt[:, 0], uv_gt[:, 1], "--o", color="white", linewidth=2.0, markersize=3.5, alpha=0.85) |
|
|
| |
| if args.highlight_front_visible_boxes and args.overlay_on_front_cam: |
| cam_axes = { |
| "CAM_FRONT_LEFT": ax_imgs[0], |
| "CAM_FRONT": ax_imgs[1], |
| "CAM_FRONT_RIGHT": ax_imgs[2], |
| "CAM_BACK_LEFT": ax_imgs[3], |
| "CAM_BACK": ax_imgs[4], |
| "CAM_BACK_RIGHT": ax_imgs[5], |
| } |
| for ch, ax in cam_axes.items(): |
| vis = (vis_by_ch.get(ch, []) or [])[: max(int(args.max_cam_labels), 0)] |
| if not vis: |
| continue |
| for _d, _bi, cat, _x, _y, _z, u, v in vis: |
| ax.scatter([u], [v], s=26, c="#FF4D4D", edgecolors="black", linewidths=0.6, zorder=20) |
| ax.text( |
| u + 6.0, |
| v - 6.0, |
| _short_cat(cat), |
| color="white", |
| fontsize=8, |
| fontweight="bold", |
| ha="left", |
| va="bottom", |
| zorder=21, |
| bbox=dict(boxstyle="round,pad=0.15", facecolor="black", edgecolor="none", alpha=0.6), |
| ) |
|
|
| |
| ax_bev = fig.add_subplot(outer[r, 1]) |
| ax_bev.set_title("BEV", fontsize=12) |
| ax_bev.set_xlabel("X (m) — right", fontsize=9) |
| ax_bev.set_ylabel("Y (m) — forward", fontsize=9) |
| ax_bev.set_aspect("equal", adjustable="box") |
| ax_bev.grid(True, linewidth=0.4, alpha=0.35) |
| ax_bev.set_xlim(float(args.bev_xlim[0]), float(args.bev_xlim[1])) |
| ax_bev.set_ylim(float(args.bev_ylim[0]), float(args.bev_ylim[1])) |
|
|
| _draw_ego_bev(ax_bev, color="black") |
|
|
| |
| if args.draw_gt_boxes: |
| boxes = item.get("gt_boxes_3d", []) or [] |
|
|
| |
| |
| if args.bev_only_visible and visible_union_all: |
| boxes_to_draw = [boxes[bi] for _d, bi, _cat, *_rest in visible_union_all if 0 <= bi < len(boxes)] |
| else: |
| boxes_to_draw = boxes |
|
|
| for b in boxes_to_draw: |
| if not isinstance(b, dict) or "world_coords" not in b: |
| continue |
| wc = b.get("world_coords", [0.0, 0.0, 0.0]) |
| cx, cy = float(wc[0]), float(wc[1]) |
| w = float(b.get("w", 1.8)) |
| l = float(b.get("l", 4.0)) |
| yaw = float(b.get("yaw", 0.0)) |
| corners = _box_corners_xy(cx, cy, w, l, yaw) |
| poly = np.vstack([corners, corners[0:1]]) |
| ax_bev.plot(poly[:, 0], poly[:, 1], color="#FF5A5A", linewidth=0.8, alpha=0.50) |
|
|
| |
| if args.highlight_front_visible_boxes and visible_union: |
| for _d, bi, cat, x_r, y_f, _z, _u, _v in visible_union: |
| if bi >= len(boxes): |
| continue |
| b = boxes[bi] |
| wc = b.get("world_coords", [0.0, 0.0, 0.0]) |
| cx, cy = float(wc[0]), float(wc[1]) |
| w = float(b.get("w", 1.8)) |
| l = float(b.get("l", 4.0)) |
| yaw = float(b.get("yaw", 0.0)) |
| corners = _box_corners_xy(cx, cy, w, l, yaw) |
| poly = np.vstack([corners, corners[0:1]]) |
| ax_bev.plot(poly[:, 0], poly[:, 1], color="#FF2E2E", linewidth=1.8, alpha=0.95) |
| ax_bev.text( |
| cx, |
| cy, |
| _short_cat(cat), |
| fontsize=7.5, |
| color="#FF2E2E", |
| ha="center", |
| va="center", |
| zorder=30, |
| bbox=dict(boxstyle="round,pad=0.12", facecolor="white", edgecolor="none", alpha=0.55), |
| ) |
|
|
| |
| det_handle = None |
| if args.draw_det_pred_boxes and det_mm is not None and ego_pose_map is not None: |
| R_e2g_t = None |
| t_e2g_t = None |
| if sid in ego_pose_map: |
| R_e2g_t, t_e2g_t = ego_pose_map[sid] |
| if R_e2g_t is not None and t_e2g_t is not None: |
| dets = _extract_results_list_for_token(det_mm, sid) |
| |
| det_plot = [] |
| xlo, xhi = float(args.bev_xlim[0]), float(args.bev_xlim[1]) |
| ylo, yhi = float(args.bev_ylim[0]), float(args.bev_ylim[1]) |
| for d in dets: |
| try: |
| score = float(d.get("detection_score", 0.0)) |
| except Exception: |
| score = 0.0 |
| if score < float(args.det_score_thresh): |
| continue |
| tr = d.get("translation", None) |
| sz = d.get("size", None) |
| rot = d.get("rotation", None) |
| if not (isinstance(tr, (list, tuple)) and len(tr) >= 3): |
| continue |
| if not (isinstance(sz, (list, tuple)) and len(sz) >= 3): |
| continue |
| if not (isinstance(rot, (list, tuple)) and len(rot) == 4): |
| continue |
|
|
| p_g = np.asarray([float(tr[0]), float(tr[1]), float(tr[2])], dtype=np.float64) |
| |
| p_e = (R_e2g_t.T @ (p_g - t_e2g_t)).reshape(3) |
| |
| x_p = float(-p_e[1]) |
| y_p = float(p_e[0]) |
| z_p = float(p_e[2]) |
| if not (xlo <= x_p <= xhi and ylo <= y_p <= yhi): |
| continue |
|
|
| |
| R_box_g = _quat_to_rotmat(float(rot[0]), float(rot[1]), float(rot[2]), float(rot[3])) |
| R_box_e = R_e2g_t.T @ R_box_g |
| yaw_e = float(math.atan2(R_box_e[1, 0], R_box_e[0, 0])) |
| yaw_p = yaw_e + math.pi / 2.0 |
|
|
| w = float(sz[0]) |
| l = float(sz[1]) |
| cat = str(d.get("detection_name", "obj")) |
| det_plot.append((score, cat, x_p, y_p, z_p, w, l, yaw_p)) |
|
|
| det_plot.sort(key=lambda t: -t[0]) |
| det_plot = det_plot[: max(int(args.det_max_boxes), 0)] |
|
|
| det_color = "#00A65A" |
| for score, cat, x_p, y_p, _z, w, l, yaw_p in det_plot: |
| corners = _box_corners_xy(x_p, y_p, w, l, yaw_p) |
| poly = np.vstack([corners, corners[0:1]]) |
| ax_bev.plot(poly[:, 0], poly[:, 1], color=det_color, linewidth=1.1, alpha=0.65, linestyle="--") |
|
|
| |
| for score, cat, x_p, y_p, _z, w, l, yaw_p in det_plot[: max(int(args.det_label_topk), 0)]: |
| ax_bev.text( |
| x_p, |
| y_p, |
| _short_cat(cat), |
| fontsize=7.2, |
| color=det_color, |
| ha="center", |
| va="center", |
| zorder=31, |
| bbox=dict(boxstyle="round,pad=0.12", facecolor="white", edgecolor="none", alpha=0.55), |
| ) |
|
|
| |
| from matplotlib.lines import Line2D |
| det_handle = Line2D([0], [0], color=det_color, lw=1.5, linestyle="--", label="Det Pred boxes") |
|
|
| |
| if gt_wps: |
| gt_arr = np.asarray(gt_wps, dtype=np.float64) |
| ax_bev.plot(gt_arr[:, 0], gt_arr[:, 1], "--o", color="black", linewidth=1.8, markersize=4.0, alpha=0.85, label="GT traj") |
| if pred_wps: |
| pr_arr = np.asarray(pred_wps, dtype=np.float64) |
| ax_bev.plot(pr_arr[:, 0], pr_arr[:, 1], "-o", color="#00C2FF", linewidth=2.4, markersize=4.5, alpha=0.95, label="Pred traj") |
|
|
| |
| handles, _labels = ax_bev.get_legend_handles_labels() |
| if args.draw_gt_boxes: |
| import matplotlib.patches as mpatches |
| handles.append(mpatches.Patch(facecolor="none", edgecolor="#FF5A5A", linewidth=1.2, label="GT boxes")) |
| if det_handle is not None: |
| handles.append(det_handle) |
| ax_bev.legend(handles=handles, loc="upper left", fontsize=9, frameon=True) |
|
|
| |
| ax_bev.text( |
| 0.02, |
| 0.02, |
| _title_case_command(str(cmd)), |
| transform=ax_bev.transAxes, |
| fontsize=12, |
| fontweight="bold", |
| ha="left", |
| va="bottom", |
| color="black", |
| bbox=dict(boxstyle="round,pad=0.25", facecolor="white", edgecolor="none", alpha=0.65), |
| ) |
|
|
| out_png.parent.mkdir(parents=True, exist_ok=True) |
| fig.savefig(out_png, bbox_inches="tight") |
| plt.close(fig) |
| print(f"[saved] {out_png}") |
|
|
| if det_mm is not None: |
| try: |
| det_mm.close() |
| except Exception: |
| pass |
| if det_f is not None: |
| try: |
| det_f.close() |
| except Exception: |
| pass |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|