Atlas-online-0318 / scripts /align_lane_to_nuscenes.py
guoyb0's picture
Upload code snapshot 0318
f693366 verified
#!/usr/bin/env python3
"""Align OpenLane-V2 lane data timestamps/ids to nuScenes keyframes.
OpenLane-V2 uses camera-based timestamps (~37ms offset from nuScenes lidar
timestamps). This script maps each lane sample to its nearest nuScenes
keyframe so that the lane data can be mixed into scene-sequential training
with det/plan/caption data sharing the same (scene_token, timestamp, id).
"""
import json
import sys
from collections import defaultdict
from pathlib import Path
def build_keyframe_lookup(det_json_path: str):
with open(det_json_path) as f:
det_data = json.load(f)
lookup = defaultdict(list)
for d in det_data:
lookup[d["scene_token"]].append((d["timestamp"], d["id"]))
for scene in lookup:
lookup[scene].sort()
return lookup
def find_nearest_keyframe(scene_kfs, lane_ts, max_diff_us=100_000):
best_diff = float("inf")
best_ts = None
best_id = None
for ts, sid in scene_kfs:
diff = abs(ts - lane_ts)
if diff < best_diff:
best_diff = diff
best_ts = ts
best_id = sid
if best_diff > max_diff_us:
return None, None, best_diff
return best_ts, best_id, best_diff
def main():
det_json = sys.argv[1] if len(sys.argv) > 1 else "data/atlas_nuscenes_train.json"
lane_json = sys.argv[2] if len(sys.argv) > 2 else "data/openlane_subsetB_lane_train_4pt.json"
out_json = sys.argv[3] if len(sys.argv) > 3 else lane_json.replace(".json", "_aligned.json")
print(f"Det: {det_json}")
print(f"Lane: {lane_json}")
print(f"Out: {out_json}")
keyframe_lookup = build_keyframe_lookup(det_json)
with open(lane_json) as f:
lane_data = json.load(f)
aligned = []
skipped_no_scene = 0
skipped_too_far = 0
diffs = []
for lane in lane_data:
scene = lane["scene_token"]
if scene not in keyframe_lookup:
skipped_no_scene += 1
continue
new_ts, new_id, diff = find_nearest_keyframe(
keyframe_lookup[scene], lane["timestamp"]
)
if new_ts is None:
skipped_too_far += 1
continue
lane["timestamp"] = new_ts
lane["id"] = new_id
aligned.append(lane)
diffs.append(diff)
with open(out_json, "w") as f:
json.dump(aligned, f, ensure_ascii=False)
print(f"\nResults:")
print(f" Aligned: {len(aligned)}")
print(f" Skipped (no scene): {skipped_no_scene}")
print(f" Skipped (too far): {skipped_too_far}")
if diffs:
print(f" Avg offset: {sum(diffs)/len(diffs):.0f} us ({sum(diffs)/len(diffs)/1e6:.4f}s)")
print(f" Max offset: {max(diffs)} us ({max(diffs)/1e6:.4f}s)")
print(f"\nSaved to: {out_json}")
if __name__ == "__main__":
main()