errm / archive /old_scripts /video_process.py
yuffish's picture
Add files using upload-large-folder tool
517964a verified
import tensorflow_datasets as tfds
import numpy as np
import json, imageio
from pathlib import Path
OUTPUT_DIR = Path("./droid_processed")
OUTPUT_DIR.mkdir(exist_ok=True, parents=True)
def extract_episode(episode, idx):
frames = []
meta = {
"episode_id": idx,
"language_instruction": None,
"steps": []
}
# steps 是一个 Generator,需要用 as_numpy_iterator 迭代
steps_iter = episode["steps"].as_numpy_iterator()
for t, step in enumerate(steps_iter):
# language_instruction 在每一帧可能重复出现在字符串中,取一次即可
if meta["language_instruction"] is None:
instr = step.get("language_instruction", b"")
meta["language_instruction"] = instr.decode("utf-8") if isinstance(instr, bytes) else str(instr)
wrist = step["observation"]["wrist_image_left"]
ext1 = step["observation"]["exterior_image_1_left"]
frame = np.concatenate([wrist, ext1], axis=1)
frames.append(frame)
meta["steps"].append({
"t": t,
"action": step["action"].tolist(),
"joint_position": step["action_dict"]["joint_position"].tolist(),
"is_first": bool(step["is_first"]),
"is_last": bool(step["is_last"]),
"is_terminal": bool(step["is_terminal"]),
})
# Weak success label: 如果出现 is_terminal 视为成功
meta["success"] = any(s["is_terminal"] for s in meta["steps"])
# Save video
video_path = OUTPUT_DIR / f"episode_{idx:06d}.mp4"
# imageio.mimwrite(video_path, frames, fps=15)
import imageio_ffmpeg
writer = imageio.get_writer(video_path, fps=15, codec='libx264')
for f in frames:
writer.append_data(f)
writer.close()
# Save JSON metadata
json_path = OUTPUT_DIR / f"episode_{idx:06d}.json"
with open(json_path, "w") as f:
json.dump(meta, f, indent=2)
print(f"[OK] Saved → {video_path}")
return video_path, json_path
# ---- Run on dataset ----
ds = tfds.load("droid_100", data_dir="/playpen-ssd/dataset/", split="train")
for idx, episode in enumerate(ds.take(20)): # 测试取前20条
extract_episode(episode, idx)