import mmcv from typing import Dict, List import pickle as pkl def sort_data(data_dict: Dict): return data_dict["token"] FRAME_INTERVAL = 10 # formula to calculate runtime # num_frame / 8 FPS * 1.66 (map eval) = # for val pkl during training, 4000 is the MAX due to time limit for autoresume # MAX_FRAME_SEQ = 4000 # total 10400 frames in 25 sequences # for val pkl during testtng, 4000 is the MAX due to time limit for autoresume MAX_FRAME_SEQ = 10000 # total 10400 frames in 25 sequences predroot = '/mnt/hdd2/datasets/carla_1.0/carla_data_0414/data_val_nusc_format.pkl' pkl_data: Dict = mmcv.load(predroot) # keys are bbox_results, occ_results_computed, planning_results_computed info_data: List[Dict] = pkl_data["infos"] info_data.sort(key=sort_data) # initialize new_info_data: List[Dict] = [] scene_token = None for index in range(len(info_data)): data_frame = info_data[index] scene_token_cur = data_frame['scene_token'] frame_id_cur = data_frame['frame_idx'] # skipping if frame_id_cur > MAX_FRAME_SEQ: continue print(data_frame['token']) print(data_frame['scene_token']) print(data_frame['frame_idx']) # new sequence if scene_token is None or scene_token_cur != scene_token: frame_idx = frame_id_cur scene_token = scene_token_cur # continue the sequence else: frame_idx += FRAME_INTERVAL new_info_data.append(data_frame) # checking assert frame_idx == frame_id_cur, f'frame id wrong, {frame_idx} vs {frame_id_cur}' print(f'total number of frames is {len(new_info_data)}') # add other keys to construct the final dict new_pkl_data = { 'infos': new_info_data, } for key in pkl_data.keys(): if key == 'infos': continue else: new_pkl_data[key] = pkl_data[key] # saving output_path = f'/mnt/hdd2/datasets/carla_1.0/carla_data_0414/data_val_nusc_format_partial_{MAX_FRAME_SEQ}.pkl' with open(output_path, "wb") as f: pkl.dump(new_pkl_data, f)