import tensorflow_datasets as tfds import numpy as np import json, imageio from pathlib import Path OUTPUT_DIR = Path("./droid_processed") OUTPUT_DIR.mkdir(exist_ok=True, parents=True) def extract_episode(episode, idx): frames = [] meta = { "episode_id": idx, "language_instruction": None, "steps": [] } # steps 是一个 Generator,需要用 as_numpy_iterator 迭代 steps_iter = episode["steps"].as_numpy_iterator() for t, step in enumerate(steps_iter): # language_instruction 在每一帧可能重复出现在字符串中,取一次即可 if meta["language_instruction"] is None: instr = step.get("language_instruction", b"") meta["language_instruction"] = instr.decode("utf-8") if isinstance(instr, bytes) else str(instr) wrist = step["observation"]["wrist_image_left"] ext1 = step["observation"]["exterior_image_1_left"] frame = np.concatenate([wrist, ext1], axis=1) frames.append(frame) meta["steps"].append({ "t": t, "action": step["action"].tolist(), "joint_position": step["action_dict"]["joint_position"].tolist(), "is_first": bool(step["is_first"]), "is_last": bool(step["is_last"]), "is_terminal": bool(step["is_terminal"]), }) # Weak success label: 如果出现 is_terminal 视为成功 meta["success"] = any(s["is_terminal"] for s in meta["steps"]) # Save video video_path = OUTPUT_DIR / f"episode_{idx:06d}.mp4" # imageio.mimwrite(video_path, frames, fps=15) import imageio_ffmpeg writer = imageio.get_writer(video_path, fps=15, codec='libx264') for f in frames: writer.append_data(f) writer.close() # Save JSON metadata json_path = OUTPUT_DIR / f"episode_{idx:06d}.json" with open(json_path, "w") as f: json.dump(meta, f, indent=2) print(f"[OK] Saved → {video_path}") return video_path, json_path # ---- Run on dataset ---- ds = tfds.load("droid_100", data_dir="/playpen-ssd/dataset/", split="train") for idx, episode in enumerate(ds.take(20)): # 测试取前20条 extract_episode(episode, idx)