import tensorflow_datasets as tfds import numpy as np import json, imageio from pathlib import Path from tqdm import tqdm FAILED_DIR = Path("./droid_failed") FAILED_DIR.mkdir(exist_ok=True, parents=True) def is_failed_episode(episode): """ 判断是否失败案例: - 找到最后一个 step(is_last == True) - is_terminal == False → 失败 """ last_step = None for step in episode["steps"].as_numpy_iterator(): last_step = step return (bool(last_step["is_last"]) == True) and (bool(last_step["is_terminal"]) == False) def save_failed_episode(episode, idx): frames = [] meta = { "episode_id": idx, "language_instruction": None, "steps": [] } for t, step in enumerate(episode["steps"].as_numpy_iterator()): if meta["language_instruction"] is None: instr = step.get("language_instruction", b"") meta["language_instruction"] = instr.decode("utf-8") if isinstance(instr, bytes) else str(instr) wrist = step["observation"]["wrist_image_left"] ext1 = step["observation"]["exterior_image_1_left"] frame = np.concatenate([wrist, ext1], axis=1) frames.append(frame) meta["steps"].append({ "t": t, "action": step["action"].tolist(), "joint_position": step["action_dict"]["joint_position"].tolist(), "is_first": bool(step["is_first"]), "is_last": bool(step["is_last"]), "is_terminal": bool(step["is_terminal"]), }) # Save video video_path = FAILED_DIR / f"failed_{idx:06d}.mp4" import imageio_ffmpeg writer = imageio.get_writer(video_path, fps=15, codec='libx264') for f in frames: writer.append_data(f) writer.close() # Save metadata json_path = FAILED_DIR / f"failed_{idx:06d}.json" with open(json_path, "w") as f: json.dump(meta, f, indent=2) print(f"[FAIL] Saved → {video_path}") # ---- Load full dataset (streaming) ---- ds = tfds.load("droid", data_dir="/playpen-ssd/dataset/", split="train", shuffle_files=False) num_total = ds.cardinality().numpy() if hasattr(ds, "cardinality") else None count = 0 for idx, episode in tqdm(enumerate(ds), total=num_total, desc="Scanning Episodes"): if is_failed_episode(episode): save_failed_episode(episode, idx) count += 1 print(f"\n✅ Completed. Extracted {count} failed episodes.") # import tensorflow_datasets as tfds # import numpy as np # import json, imageio # from pathlib import Path # FAILED_DIR = Path("./droid_failed") # FAILED_DIR.mkdir(exist_ok=True, parents=True) # def is_failed_episode(episode): # """ # 判断是否失败案例: # - 找到最后一个 step(is_last == True) # - is_terminal == False → 表示执行结束但失败 # """ # for step in episode["steps"].as_numpy_iterator(): # pass # iterate to last step # return (bool(step["is_last"]) == True) and (bool(step["is_terminal"]) == False) # def extract_failed_episode(episode, idx): # frames = [] # meta = { # "episode_id": idx, # "language_instruction": None, # "steps": [] # } # steps_iter = episode["steps"].as_numpy_iterator() # for t, step in enumerate(steps_iter): # if meta["language_instruction"] is None: # instr = step.get("language_instruction", b"") # meta["language_instruction"] = ( # instr.decode("utf-8") if isinstance(instr, bytes) else str(instr) # ) # wrist = step["observation"]["wrist_image_left"] # ext1 = step["observation"]["exterior_image_1_left"] # frame = np.concatenate([wrist, ext1], axis=1) # frames.append(frame) # meta["steps"].append({ # "t": t, # "action": step["action"].tolist(), # "joint_position": step["action_dict"]["joint_position"].tolist(), # "is_first": bool(step["is_first"]), # "is_last": bool(step["is_last"]), # "is_terminal": bool(step["is_terminal"]), # }) # # Save FAILED video + metadata # video_path = FAILED_DIR / f"failed_{idx:06d}.mp4" # json_path = FAILED_DIR / f"failed_{idx:06d}.json" # import imageio_ffmpeg # writer = imageio.get_writer(video_path, fps=15, codec='libx264') # for f in frames: # writer.append_data(f) # writer.close() # with open(json_path, "w") as f: # json.dump(meta, f, indent=2) # print(f"[FAIL] Saved → {video_path}") # return video_path, json_path # # ---- Run ---- # ds = tfds.load("droid_100", data_dir="/playpen-ssd/dataset/", split="train") # count = 0 # for idx, episode in enumerate(ds): # if is_failed_episode(episode): # extract_failed_episode(episode, idx) # count += 1 # print(f"\n✅ Done. Found and extracted {count} failed episodes.")