Atlas-online / work_dirs /visualization_collection /scripts /vis_atlas_planning_qualitative.py
guoyb0's picture
Add files using upload-large-folder tool
7dfc72e verified
#!/usr/bin/env python3
"""
Paper-style qualitative planning visualization (Figure-4-like):
- Left: 6-camera mosaic (2x3)
- Right: BEV panel with planned trajectory
Optionally overlays the planned trajectory onto CAM_FRONT using *fixed* nuScenes
camera calibration (loaded from v1.0-trainval sensor/calibrated_sensor tables).
This avoids scanning the 1.3GB sample_data.json.
"""
from __future__ import annotations
import argparse
import json
import math
import mmap
import pickle
import sys
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import numpy as np
_REPO_ROOT = Path(__file__).resolve().parent.parent
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
CAM_ORDER_DATAJSON = [
"CAM_FRONT",
"CAM_FRONT_RIGHT",
"CAM_FRONT_LEFT",
"CAM_BACK",
"CAM_BACK_LEFT",
"CAM_BACK_RIGHT",
]
CAM_ORDER_PAPER = [
"CAM_FRONT_LEFT",
"CAM_FRONT",
"CAM_FRONT_RIGHT",
"CAM_BACK_LEFT",
"CAM_BACK",
"CAM_BACK_RIGHT",
]
_IDX_REORDER = [2, 0, 1, 4, 3, 5] # data_json -> paper layout
def _load_json(path: Path):
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def _load_pickle(path: Path):
with path.open("rb") as f:
return pickle.load(f)
def _extract_results_list_for_token(mm: "mmap.mmap", token: str) -> List[Dict]:
"""
Extract `results[token]` list from a nuScenes detection results JSON mmap.
The file is a minified JSON like:
{"meta": {...}, "results": {"<token>": [ {...}, ... ], ...}}
"""
pat = (f"\"{token}\":").encode("utf-8")
idx = mm.find(pat)
if idx < 0:
return []
j = idx + len(pat)
# Skip whitespace
while j < len(mm) and mm[j] in b" \t\r\n":
j += 1
if j >= len(mm) or mm[j : j + 1] != b"[":
return []
start = j
depth = 0
in_str = False
esc = False
k = start
end = None
while k < len(mm):
c = mm[k]
if in_str:
if esc:
esc = False
elif c == 0x5C: # backslash
esc = True
elif c == 0x22: # quote
in_str = False
else:
if c == 0x22:
in_str = True
elif c == 0x5B: # [
depth += 1
elif c == 0x5D: # ]
depth -= 1
if depth == 0:
end = k + 1
break
k += 1
if end is None:
return []
try:
return json.loads(mm[start:end].decode("utf-8"))
except Exception:
return []
def _load_ego_pose_map(nuscenes_root: Path) -> Dict[str, Tuple[np.ndarray, np.ndarray]]:
"""
Map: sample_token -> (R_ego2global(3x3), t_ego2global(3,))
"""
pkl = nuscenes_root / "nuscenes_infos_val.pkl"
if not pkl.exists():
raise FileNotFoundError(f"Missing {pkl} (required to transform detector global boxes to ego frame).")
obj = _load_pickle(pkl)
infos = obj["infos"] if isinstance(obj, dict) and "infos" in obj else obj
out: Dict[str, Tuple[np.ndarray, np.ndarray]] = {}
for it in infos:
tok = str(it.get("token", ""))
if not tok:
continue
t = np.asarray(it.get("ego2global_translation", [0.0, 0.0, 0.0]), dtype=np.float64).reshape(3)
q = it.get("ego2global_rotation", [1.0, 0.0, 0.0, 0.0])
if not (isinstance(q, (list, tuple)) and len(q) == 4):
continue
R = _quat_to_rotmat(float(q[0]), float(q[1]), float(q[2]), float(q[3]))
out[tok] = (R, t)
return out
def _quat_to_rotmat(qw: float, qx: float, qy: float, qz: float) -> np.ndarray:
# nuScenes quaternions are in (w, x, y, z)
n = math.sqrt(qw * qw + qx * qx + qy * qy + qz * qz)
if n < 1e-12:
return np.eye(3, dtype=np.float64)
qw, qx, qy, qz = qw / n, qx / n, qy / n, qz / n
# Standard quaternion -> rotation matrix
xx, yy, zz = qx * qx, qy * qy, qz * qz
xy, xz, yz = qx * qy, qx * qz, qy * qz
wx, wy, wz = qw * qx, qw * qy, qw * qz
return np.array(
[
[1.0 - 2.0 * (yy + zz), 2.0 * (xy - wz), 2.0 * (xz + wy)],
[2.0 * (xy + wz), 1.0 - 2.0 * (xx + zz), 2.0 * (yz - wx)],
[2.0 * (xz - wy), 2.0 * (yz + wx), 1.0 - 2.0 * (xx + yy)],
],
dtype=np.float64,
)
def _load_fixed_cam_front_calib(nuscenes_root: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Returns (R_c2e, t_c2e, K) for CAM_FRONT.
Uses the first calibrated_sensor record matching CAM_FRONT.
"""
meta_root = nuscenes_root / "v1.0-trainval"
sensor = _load_json(meta_root / "sensor.json")
calib = _load_json(meta_root / "calibrated_sensor.json")
# sensor.json is a list of dicts
sensor_token = None
for rec in sensor:
if rec.get("channel") == "CAM_FRONT":
sensor_token = rec.get("token")
break
if sensor_token is None:
raise RuntimeError("CAM_FRONT not found in sensor.json")
calib_rec = None
for rec in calib:
if rec.get("sensor_token") == sensor_token:
calib_rec = rec
break
if calib_rec is None:
raise RuntimeError("No calibrated_sensor record found for CAM_FRONT")
t = np.asarray(calib_rec.get("translation", [0.0, 0.0, 0.0]), dtype=np.float64).reshape(3)
q = calib_rec.get("rotation", [1.0, 0.0, 0.0, 0.0])
if not (isinstance(q, (list, tuple)) and len(q) == 4):
raise RuntimeError(f"Unexpected CAM_FRONT rotation quaternion: {q}")
R = _quat_to_rotmat(float(q[0]), float(q[1]), float(q[2]), float(q[3]))
K = np.asarray(calib_rec.get("camera_intrinsic", np.eye(3).tolist()), dtype=np.float64)
if K.shape != (3, 3):
raise RuntimeError(f"Unexpected CAM_FRONT camera_intrinsic shape: {K.shape}")
return R, t, K
def _load_fixed_cam_calibs(nuscenes_root: Path) -> Dict[str, Tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""
Load fixed (R_c2e, t_c2e, K) for all nuScenes cameras by channel name.
"""
meta_root = nuscenes_root / "v1.0-trainval"
sensor = _load_json(meta_root / "sensor.json")
calib = _load_json(meta_root / "calibrated_sensor.json")
sensor_token_by_channel: Dict[str, str] = {}
for rec in sensor:
ch = rec.get("channel")
tok = rec.get("token")
if isinstance(ch, str) and isinstance(tok, str):
sensor_token_by_channel[ch] = tok
calib_by_sensor_token: Dict[str, Dict] = {}
for rec in calib:
st = rec.get("sensor_token")
if isinstance(st, str):
calib_by_sensor_token[st] = rec
out: Dict[str, Tuple[np.ndarray, np.ndarray, np.ndarray]] = {}
for ch, st in sensor_token_by_channel.items():
rec = calib_by_sensor_token.get(st)
if not rec or "camera_intrinsic" not in rec:
continue
t = np.asarray(rec.get("translation", [0.0, 0.0, 0.0]), dtype=np.float64).reshape(3)
q = rec.get("rotation", [1.0, 0.0, 0.0, 0.0])
if not (isinstance(q, (list, tuple)) and len(q) == 4):
continue
R = _quat_to_rotmat(float(q[0]), float(q[1]), float(q[2]), float(q[3]))
K = np.asarray(rec.get("camera_intrinsic", np.eye(3).tolist()), dtype=np.float64)
if K.shape != (3, 3):
continue
out[ch] = (R, t, K)
return out
def _paper_xy_to_nuscenes_ego_xyz(x_right: float, y_fwd: float, z_up: float = 0.0) -> np.ndarray:
# nuScenes ego: x forward, y left, z up
x_fwd = float(y_fwd)
y_left = float(-x_right)
return np.array([x_fwd, y_left, float(z_up)], dtype=np.float64)
def _project_ego_points_to_cam(
pts_ego_xyz: np.ndarray, # (N,3) in nuScenes ego
R_c2e: np.ndarray,
t_c2e: np.ndarray,
K: np.ndarray,
) -> np.ndarray:
"""
Project nuScenes ego-frame 3D points to pixel coords in CAM_FRONT.
Returns (M,2) pixels for points with z_cam > 0.
"""
# ego -> cam: p_cam = R_c2e^T (p_ego - t_c2e)
R_e2c = R_c2e.T
pts_cam = (R_e2c @ (pts_ego_xyz - t_c2e[None, :]).T).T # (N,3)
z = pts_cam[:, 2]
keep = z > 1e-3
pts_cam = pts_cam[keep]
if pts_cam.shape[0] == 0:
return np.zeros((0, 2), dtype=np.float64)
x = pts_cam[:, 0] / pts_cam[:, 2]
y = pts_cam[:, 1] / pts_cam[:, 2]
fx, fy = float(K[0, 0]), float(K[1, 1])
cx, cy = float(K[0, 2]), float(K[1, 2])
u = fx * x + cx
v = fy * y + cy
return np.stack([u, v], axis=1)
def _project_one_ego_point_to_cam(
pt_ego_xyz: np.ndarray, # (3,)
R_c2e: np.ndarray,
t_c2e: np.ndarray,
K: np.ndarray,
) -> Optional[Tuple[float, float, float]]:
"""Return (u, v, z_cam) or None if behind camera."""
pt_ego_xyz = np.asarray(pt_ego_xyz, dtype=np.float64).reshape(3)
R_e2c = R_c2e.T
pt_cam = R_e2c @ (pt_ego_xyz - t_c2e)
zc = float(pt_cam[2])
if zc <= 1e-3:
return None
fx, fy = float(K[0, 0]), float(K[1, 1])
cx, cy = float(K[0, 2]), float(K[1, 2])
u = fx * float(pt_cam[0] / zc) + cx
v = fy * float(pt_cam[1] / zc) + cy
return float(u), float(v), zc
def _draw_ego_bev(ax, *, color: str = "k"):
import matplotlib.patches as patches
w, l = 1.85, 4.084
rect = patches.Rectangle(
(-w / 2.0, -l / 2.0),
w,
l,
linewidth=1.2,
edgecolor=color,
facecolor="none",
zorder=10,
)
ax.add_patch(rect)
ax.arrow(0.0, 0.0, 0.0, l * 0.8, head_width=0.6, head_length=0.8, fc=color, ec=color, zorder=11)
ax.text(0.2, -0.6, "EGO", color=color, fontsize=8, zorder=12)
def _box_corners_xy(cx: float, cy: float, w: float, l: float, yaw: float) -> np.ndarray:
"""
Oriented rectangle corners in XY (paper coords).
yaw is treated as radians in the same XY frame (best-effort).
IMPORTANT: In our planning eval JSON, `yaw` behaves like a standard heading
angle measured from +X (right) axis (yaw-from-x). So:
- yaw = 0 => vehicle length points to +X (right)
- yaw = +pi/2 => vehicle length points to +Y (forward)
"""
c, s = math.cos(yaw), math.sin(yaw)
center = np.array([cx, cy], dtype=np.float64)
# Length axis (heading) and width axis (perpendicular), yaw-from-x
d_len = np.array([c, s], dtype=np.float64) * (l / 2.0)
d_wid = np.array([-s, c], dtype=np.float64) * (w / 2.0)
corners = np.stack(
[
center + d_len + d_wid,
center + d_len - d_wid,
center - d_len - d_wid,
center - d_len + d_wid,
],
axis=0,
)
return corners
def _title_case_command(cmd: str) -> str:
c = (cmd or "").strip().lower()
if c == "turn left":
return "Turn Left"
if c == "turn right":
return "Turn Right"
if c == "go straight":
return "Go Straight"
return cmd
def _short_cat(cat: str) -> str:
c = (cat or "").strip()
if not c:
return "obj"
tail = c.split(".")[-1]
mapping = {
"car": "car",
"truck": "truck",
"trailer": "trailer",
"bus": "bus",
"construction": "cveh",
"construction_vehicle": "cveh",
"pedestrian": "ped",
"trafficcone": "cone",
"traffic_cone": "cone",
"barrier": "barrier",
"motorcycle": "moto",
"bicycle": "bike",
}
return mapping.get(tail, tail[:10])
def _select_default_samples(data_items: List[Dict]) -> List[str]:
# prefer (turn right, go straight) like the paper figure
by_cmd: Dict[str, str] = {}
for it in data_items:
cmd = (it.get("ego_motion", {}) or {}).get("command")
if cmd and cmd not in by_cmd and it.get("id"):
by_cmd[str(cmd)] = str(it["id"])
out = []
if "turn right" in by_cmd:
out.append(by_cmd["turn right"])
if "go straight" in by_cmd:
out.append(by_cmd["go straight"])
if not out:
out = [str(it.get("id")) for it in data_items[:2] if it.get("id")]
return out[:2]
def parse_args():
ap = argparse.ArgumentParser()
ap.add_argument("--eval_json", type=str, default="work_dirs/eval_nocap_ep2_plan50.json")
ap.add_argument("--data_json", type=str, default="data/_eval_ep2_plan50.json")
ap.add_argument("--data_root", type=str, default="/home/guoyuanbo/autodl-tmp/data/nuscenes")
ap.add_argument("--out_png", type=str, default="work_dirs/atlas_planning_qualitative.png")
ap.add_argument("--sample_ids", type=str, nargs="*", default=None, help="Provide 1-2 sample ids; default picks turn right & go straight.")
ap.add_argument("--bev_xlim", type=float, nargs=2, default=[-30.0, 30.0])
ap.add_argument("--bev_ylim", type=float, nargs=2, default=[-10.0, 55.0])
ap.add_argument("--dpi", type=int, default=200)
ap.add_argument("--draw_gt_boxes", action="store_true", default=True)
ap.add_argument("--no_draw_gt_boxes", action="store_false", dest="draw_gt_boxes")
ap.add_argument("--overlay_on_front_cam", action="store_true", default=True)
ap.add_argument("--no_overlay_on_front_cam", action="store_false", dest="overlay_on_front_cam")
ap.add_argument("--highlight_front_visible_boxes", action="store_true", default=True)
ap.add_argument("--no_highlight_front_visible_boxes", action="store_false", dest="highlight_front_visible_boxes")
ap.add_argument("--max_front_labels", type=int, default=8, help="Max GT box labels shown on CAM_FRONT and BEV.")
ap.add_argument("--max_cam_labels", type=int, default=4, help="Max GT labels per camera view (all 6 views).")
ap.add_argument("--bev_only_visible", action="store_true", default=True, help="If set, BEV draws only FRONT* visible GT boxes.")
ap.add_argument("--bev_show_all", action="store_false", dest="bev_only_visible", help="Draw all GT boxes faintly in BEV (plus highlighted).")
ap.add_argument(
"--det_results_json",
type=str,
default="external/StreamPETR/val/work_dirs/streampetr_atlas_aligned/Tue_Feb_10_23_13_58_2026/pts_bbox/results_nusc.json",
help="nuScenes detection results JSON (StreamPETR).",
)
ap.add_argument("--draw_det_pred_boxes", action="store_true", default=True)
ap.add_argument("--no_draw_det_pred_boxes", action="store_false", dest="draw_det_pred_boxes")
ap.add_argument("--det_score_thresh", type=float, default=0.3)
ap.add_argument("--det_max_boxes", type=int, default=30)
ap.add_argument("--det_label_topk", type=int, default=6, help="Label top-k detector boxes in BEV.")
return ap.parse_args()
def main():
args = parse_args()
repo = _REPO_ROOT
eval_path = (repo / args.eval_json).resolve()
data_path = (repo / args.data_json).resolve()
nuscenes_root = Path(args.data_root).resolve()
out_png = (repo / args.out_png).resolve()
eval_obj = _load_json(eval_path)
data_items = _load_json(data_path)
# Build lookup maps
pred_text_by_id: Dict[str, str] = {}
for rec in eval_obj.get("predictions", []):
sid = str(rec.get("sample_id", ""))
if not sid:
continue
pred_text_by_id[sid] = str(rec.get("generated_text", ""))
item_by_id: Dict[str, Dict] = {str(it.get("id")): it for it in data_items if it.get("id")}
sample_ids = list(args.sample_ids) if args.sample_ids else _select_default_samples(data_items)
sample_ids = [sid for sid in sample_ids if sid in item_by_id]
if not sample_ids:
raise RuntimeError("No valid sample_ids found. Provide --sample_ids explicitly.")
if len(sample_ids) > 2:
sample_ids = sample_ids[:2]
# Calibration for projecting onto CAM_FRONT
cam_calibs: Dict[str, Tuple[np.ndarray, np.ndarray, np.ndarray]] = {}
if args.overlay_on_front_cam:
cam_calibs = _load_fixed_cam_calibs(nuscenes_root)
from src.eval.metrics import parse_planning_output
# Detector predictions (StreamPETR nuScenes results) are in GLOBAL coords.
# We need ego pose to transform them into ego/paper coords for BEV plotting.
det_mm = None
det_f = None
ego_pose_map = None
det_results_path = (repo / args.det_results_json).resolve()
if args.draw_det_pred_boxes and det_results_path.exists():
try:
ego_pose_map = _load_ego_pose_map(nuscenes_root)
det_f = det_results_path.open("rb")
det_mm = mmap.mmap(det_f.fileno(), 0, access=mmap.ACCESS_READ)
except Exception:
det_mm = None
if det_f is not None:
det_f.close()
det_f = None
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec, GridSpecFromSubplotSpec
nrows = len(sample_ids)
fig = plt.figure(figsize=(12.5, 6.5 * nrows), dpi=args.dpi)
fig.suptitle("Atlas Planning — 6-Camera + BEV (Paper-style)", fontsize=16, y=0.99)
outer = GridSpec(nrows, 2, figure=fig, width_ratios=[3.2, 1.4], wspace=0.10, hspace=0.18)
from PIL import Image
for r, sid in enumerate(sample_ids):
item = item_by_id[sid]
pred_text = pred_text_by_id.get(sid, "")
plan = parse_planning_output(pred_text) if pred_text else None
pred_wps = (plan or {}).get("waypoints", []) if plan else []
gt_wps = (item.get("ego_motion", {}) or {}).get("waypoints", []) or []
cmd = (item.get("ego_motion", {}) or {}).get("command", "")
# Load images
rel_paths: List[str] = list(item.get("image_paths", []) or [])
if len(rel_paths) != 6:
raise RuntimeError(f"sample {sid}: expected 6 image_paths, got {len(rel_paths)}")
imgs = []
for rp in rel_paths:
p = Path(rp)
if not p.is_absolute():
p = nuscenes_root / rp
imgs.append(Image.open(p).convert("RGB"))
imgs = [imgs[i] for i in _IDX_REORDER]
front_w, front_h = imgs[1].size # CAM_FRONT in paper order
# Left: 2x3 image mosaic
left = GridSpecFromSubplotSpec(2, 3, subplot_spec=outer[r, 0], wspace=0.02, hspace=0.06)
ax_imgs = []
for i in range(6):
ax = fig.add_subplot(left[i // 3, i % 3])
ax.imshow(imgs[i])
# Lock image axis limits so later overlays don't autoscale-shrink the image.
w_i, h_i = imgs[i].size
ax.set_xlim(0, w_i)
ax.set_ylim(h_i, 0)
ax.set_title(CAM_ORDER_PAPER[i], fontsize=9)
ax.axis("off")
ax_imgs.append(ax)
# Collect GT boxes that are visible in each FRONT* camera (by projected centers).
boxes = item.get("gt_boxes_3d", []) or []
def _visible_for_channel(channel: str, img_w: int, img_h: int):
if not (args.highlight_front_visible_boxes and args.overlay_on_front_cam):
return []
if channel not in cam_calibs:
return []
R_c2e, t_c2e, K = cam_calibs[channel]
out = []
for bi, b in enumerate(boxes):
if not isinstance(b, dict) or "world_coords" not in b:
continue
wc = b.get("world_coords", [0.0, 0.0, 0.0])
if not (isinstance(wc, (list, tuple)) and len(wc) >= 3):
continue
x_r, y_f, z_u = float(wc[0]), float(wc[1]), float(wc[2])
pt_ego = _paper_xy_to_nuscenes_ego_xyz(x_r, y_f, z_u)
uv = _project_one_ego_point_to_cam(pt_ego, R_c2e, t_c2e, K)
if uv is None:
continue
u, v, _zc = uv
if 0.0 <= u <= float(img_w) and 0.0 <= v <= float(img_h):
d = float(math.hypot(x_r, y_f))
out.append((d, bi, str(b.get("category", "")), x_r, y_f, z_u, u, v))
out.sort(key=lambda t: t[0])
return out
vis_by_ch = {
"CAM_FRONT_LEFT": _visible_for_channel("CAM_FRONT_LEFT", imgs[0].size[0], imgs[0].size[1]),
"CAM_FRONT": _visible_for_channel("CAM_FRONT", imgs[1].size[0], imgs[1].size[1]),
"CAM_FRONT_RIGHT": _visible_for_channel("CAM_FRONT_RIGHT", imgs[2].size[0], imgs[2].size[1]),
"CAM_BACK_LEFT": _visible_for_channel("CAM_BACK_LEFT", imgs[3].size[0], imgs[3].size[1]),
"CAM_BACK": _visible_for_channel("CAM_BACK", imgs[4].size[0], imgs[4].size[1]),
"CAM_BACK_RIGHT": _visible_for_channel("CAM_BACK_RIGHT", imgs[5].size[0], imgs[5].size[1]),
}
visible_union_all = []
if any(vis_by_ch.get(ch) for ch in vis_by_ch.keys()):
seen = set()
for ch in ("CAM_FRONT_LEFT", "CAM_FRONT", "CAM_FRONT_RIGHT", "CAM_BACK_LEFT", "CAM_BACK", "CAM_BACK_RIGHT"):
for tup in vis_by_ch.get(ch, []):
bi = tup[1]
if bi in seen:
continue
seen.add(bi)
visible_union_all.append(tup)
visible_union_all.sort(key=lambda t: t[0])
visible_union = visible_union_all[: max(int(args.max_front_labels), 0)]
# Overlay trajectory on CAM_FRONT (middle of top row in paper order)
if args.overlay_on_front_cam and pred_wps and "CAM_FRONT" in cam_calibs:
R_c2e, t_c2e, K = cam_calibs["CAM_FRONT"]
ax_front = ax_imgs[1]
# Pred
pts_pred = np.array([_paper_xy_to_nuscenes_ego_xyz(x, y, 0.0) for x, y in pred_wps], dtype=np.float64)
uv_pred = _project_ego_points_to_cam(pts_pred, R_c2e, t_c2e, K)
if uv_pred.shape[0] >= 2:
ax_front.plot(uv_pred[:, 0], uv_pred[:, 1], "-o", color="#00C2FF", linewidth=2.0, markersize=4.0, alpha=0.95)
# GT (dashed)
if gt_wps:
pts_gt = np.array([_paper_xy_to_nuscenes_ego_xyz(x, y, 0.0) for x, y in gt_wps], dtype=np.float64)
uv_gt = _project_ego_points_to_cam(pts_gt, R_c2e, t_c2e, K)
if uv_gt.shape[0] >= 2:
ax_front.plot(uv_gt[:, 0], uv_gt[:, 1], "--o", color="white", linewidth=2.0, markersize=3.5, alpha=0.85)
# Highlight GT centers on the three FRONT* cameras (helps validate BEV↔camera consistency).
if args.highlight_front_visible_boxes and args.overlay_on_front_cam:
cam_axes = {
"CAM_FRONT_LEFT": ax_imgs[0],
"CAM_FRONT": ax_imgs[1],
"CAM_FRONT_RIGHT": ax_imgs[2],
"CAM_BACK_LEFT": ax_imgs[3],
"CAM_BACK": ax_imgs[4],
"CAM_BACK_RIGHT": ax_imgs[5],
}
for ch, ax in cam_axes.items():
vis = (vis_by_ch.get(ch, []) or [])[: max(int(args.max_cam_labels), 0)]
if not vis:
continue
for _d, _bi, cat, _x, _y, _z, u, v in vis:
ax.scatter([u], [v], s=26, c="#FF4D4D", edgecolors="black", linewidths=0.6, zorder=20)
ax.text(
u + 6.0,
v - 6.0,
_short_cat(cat),
color="white",
fontsize=8,
fontweight="bold",
ha="left",
va="bottom",
zorder=21,
bbox=dict(boxstyle="round,pad=0.15", facecolor="black", edgecolor="none", alpha=0.6),
)
# Right: BEV
ax_bev = fig.add_subplot(outer[r, 1])
ax_bev.set_title("BEV", fontsize=12)
ax_bev.set_xlabel("X (m) — right", fontsize=9)
ax_bev.set_ylabel("Y (m) — forward", fontsize=9)
ax_bev.set_aspect("equal", adjustable="box")
ax_bev.grid(True, linewidth=0.4, alpha=0.35)
ax_bev.set_xlim(float(args.bev_xlim[0]), float(args.bev_xlim[1]))
ax_bev.set_ylim(float(args.bev_ylim[0]), float(args.bev_ylim[1]))
_draw_ego_bev(ax_bev, color="black")
# Draw GT boxes (paper-style context)
if args.draw_gt_boxes:
boxes = item.get("gt_boxes_3d", []) or []
# When debugging camera↔BEV consistency, draw only the boxes that are visible
# in the shown camera views to avoid confusing off-screen objects.
if args.bev_only_visible and visible_union_all:
boxes_to_draw = [boxes[bi] for _d, bi, _cat, *_rest in visible_union_all if 0 <= bi < len(boxes)]
else:
boxes_to_draw = boxes
for b in boxes_to_draw:
if not isinstance(b, dict) or "world_coords" not in b:
continue
wc = b.get("world_coords", [0.0, 0.0, 0.0])
cx, cy = float(wc[0]), float(wc[1])
w = float(b.get("w", 1.8))
l = float(b.get("l", 4.0))
yaw = float(b.get("yaw", 0.0))
corners = _box_corners_xy(cx, cy, w, l, yaw)
poly = np.vstack([corners, corners[0:1]])
ax_bev.plot(poly[:, 0], poly[:, 1], color="#FF5A5A", linewidth=0.8, alpha=0.50)
# Re-draw FRONT* visible boxes thicker + label them in BEV.
if args.highlight_front_visible_boxes and visible_union:
for _d, bi, cat, x_r, y_f, _z, _u, _v in visible_union:
if bi >= len(boxes):
continue
b = boxes[bi]
wc = b.get("world_coords", [0.0, 0.0, 0.0])
cx, cy = float(wc[0]), float(wc[1])
w = float(b.get("w", 1.8))
l = float(b.get("l", 4.0))
yaw = float(b.get("yaw", 0.0))
corners = _box_corners_xy(cx, cy, w, l, yaw)
poly = np.vstack([corners, corners[0:1]])
ax_bev.plot(poly[:, 0], poly[:, 1], color="#FF2E2E", linewidth=1.8, alpha=0.95)
ax_bev.text(
cx,
cy,
_short_cat(cat),
fontsize=7.5,
color="#FF2E2E",
ha="center",
va="center",
zorder=30,
bbox=dict(boxstyle="round,pad=0.12", facecolor="white", edgecolor="none", alpha=0.55),
)
# Draw detector predicted boxes (StreamPETR) in BEV.
det_handle = None
if args.draw_det_pred_boxes and det_mm is not None and ego_pose_map is not None:
R_e2g_t = None
t_e2g_t = None
if sid in ego_pose_map:
R_e2g_t, t_e2g_t = ego_pose_map[sid]
if R_e2g_t is not None and t_e2g_t is not None:
dets = _extract_results_list_for_token(det_mm, sid)
# Filter & transform
det_plot = []
xlo, xhi = float(args.bev_xlim[0]), float(args.bev_xlim[1])
ylo, yhi = float(args.bev_ylim[0]), float(args.bev_ylim[1])
for d in dets:
try:
score = float(d.get("detection_score", 0.0))
except Exception:
score = 0.0
if score < float(args.det_score_thresh):
continue
tr = d.get("translation", None)
sz = d.get("size", None)
rot = d.get("rotation", None)
if not (isinstance(tr, (list, tuple)) and len(tr) >= 3):
continue
if not (isinstance(sz, (list, tuple)) and len(sz) >= 3):
continue
if not (isinstance(rot, (list, tuple)) and len(rot) == 4):
continue
p_g = np.asarray([float(tr[0]), float(tr[1]), float(tr[2])], dtype=np.float64)
# global -> ego
p_e = (R_e2g_t.T @ (p_g - t_e2g_t)).reshape(3)
# ego -> paper
x_p = float(-p_e[1])
y_p = float(p_e[0])
z_p = float(p_e[2])
if not (xlo <= x_p <= xhi and ylo <= y_p <= yhi):
continue
# orientation: global -> ego -> paper yaw
R_box_g = _quat_to_rotmat(float(rot[0]), float(rot[1]), float(rot[2]), float(rot[3]))
R_box_e = R_e2g_t.T @ R_box_g
yaw_e = float(math.atan2(R_box_e[1, 0], R_box_e[0, 0])) # yaw-from-x in ego
yaw_p = yaw_e + math.pi / 2.0 # convert to paper yaw-from-x
w = float(sz[0])
l = float(sz[1])
cat = str(d.get("detection_name", "obj"))
det_plot.append((score, cat, x_p, y_p, z_p, w, l, yaw_p))
det_plot.sort(key=lambda t: -t[0])
det_plot = det_plot[: max(int(args.det_max_boxes), 0)]
det_color = "#00A65A" # green
for score, cat, x_p, y_p, _z, w, l, yaw_p in det_plot:
corners = _box_corners_xy(x_p, y_p, w, l, yaw_p)
poly = np.vstack([corners, corners[0:1]])
ax_bev.plot(poly[:, 0], poly[:, 1], color=det_color, linewidth=1.1, alpha=0.65, linestyle="--")
# Label top-k predictions to show "front vehicles"
for score, cat, x_p, y_p, _z, w, l, yaw_p in det_plot[: max(int(args.det_label_topk), 0)]:
ax_bev.text(
x_p,
y_p,
_short_cat(cat),
fontsize=7.2,
color=det_color,
ha="center",
va="center",
zorder=31,
bbox=dict(boxstyle="round,pad=0.12", facecolor="white", edgecolor="none", alpha=0.55),
)
# Legend handle for detector boxes
from matplotlib.lines import Line2D
det_handle = Line2D([0], [0], color=det_color, lw=1.5, linestyle="--", label="Det Pred boxes")
# Plot trajectories
if gt_wps:
gt_arr = np.asarray(gt_wps, dtype=np.float64)
ax_bev.plot(gt_arr[:, 0], gt_arr[:, 1], "--o", color="black", linewidth=1.8, markersize=4.0, alpha=0.85, label="GT traj")
if pred_wps:
pr_arr = np.asarray(pred_wps, dtype=np.float64)
ax_bev.plot(pr_arr[:, 0], pr_arr[:, 1], "-o", color="#00C2FF", linewidth=2.4, markersize=4.5, alpha=0.95, label="Pred traj")
# Legend: add GT boxes handle to avoid confusion (boxes are NOT predicted here).
handles, _labels = ax_bev.get_legend_handles_labels()
if args.draw_gt_boxes:
import matplotlib.patches as mpatches
handles.append(mpatches.Patch(facecolor="none", edgecolor="#FF5A5A", linewidth=1.2, label="GT boxes"))
if det_handle is not None:
handles.append(det_handle)
ax_bev.legend(handles=handles, loc="upper left", fontsize=9, frameon=True)
# Command label (like paper: bottom-left)
ax_bev.text(
0.02,
0.02,
_title_case_command(str(cmd)),
transform=ax_bev.transAxes,
fontsize=12,
fontweight="bold",
ha="left",
va="bottom",
color="black",
bbox=dict(boxstyle="round,pad=0.25", facecolor="white", edgecolor="none", alpha=0.65),
)
out_png.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(out_png, bbox_inches="tight")
plt.close(fig)
print(f"[saved] {out_png}")
if det_mm is not None:
try:
det_mm.close()
except Exception:
pass
if det_f is not None:
try:
det_f.close()
except Exception:
pass
if __name__ == "__main__":
main()