import os import json import base64 from pathlib import Path from tqdm import tqdm from dotenv import load_dotenv import decord from decord import VideoReader from openai import OpenAI load_dotenv() # client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) client = OpenAI(api_key="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") # -------- CONFIG -------- VIDEO_DIR = "./data/droid" OUTPUT_FILE = "./output/labels.jsonl" FPS_SAMPLE = 2 # 每秒取多少帧 MODEL_NAME = "gpt-5-mini" # 可换:gpt-4.1, gpt-4o, o1-mini # -------- PROMPT -------- PROMPT = """ You are a robot manipulation evaluator. You analyze the video step-by-step and determine progress toward task success. The task usually follows 6 subtask phases in sequence: 1) reach - the robot moves toward the target object 2) grasp - the robot attempts to grasp or secure the object 3) up - the robot lifts the object from the surface 4) move - the robot moves the object toward the target location 5) place - the robot places/releases the object 6) return - the robot returns to a neutral/resting state (optional) For each time step, output: - stage: one of ["reach", "grasp", "up", "move", "place", "return"] - reward (0 to 1): current progress toward the goal - delta (-1 or 1): improvement vs previous step - success_prob (0 to 1): probability the task will end successfully - failure (0 or 1): - 1 if the robot enters an irreversible failure (object dropped, wrong target, motion stopped incorrectly, or no longer recoverable) - For the LAST time step: if success_prob is still low and the task is not achieved, force failure = 1 - explanation: short explanation describing what is happening and why the reward changes Rules: - The phase must progress forward logically (reach → grasp → up → move → place → return) unless failure occurs. - reward should generally increase as progress is made and drop when mistakes occur. - delta reflects positive, negative, or no progress change between steps. Output JSON list only, no extra commentary: [ { "t": , "stage": "", "reward": , "delta": , "success_prob": , "failure": <0 or 1>, "explanation": "" } ] """ # PROMPT = """ # You are a robot manipulation evaluator that analyzes a video step-by-step. # Your goal is to judge progress toward a task. # For each time step, output: # - reward (0 to 1): current progress toward the task goal. # - delta (-1 or 1): improvement vs previous step. # - success_prob (0 to 1): probability the episode will end successfully. # - failure (0 or 1): irreversible failure detection. # - explanation: concise reasoning. # Output JSON list only, no extra text: # [ # { # "t": , # "reward": , # "delta": , # "success_prob": , # "failure": <0 or 1>, # "explanation": "" # } # ] # """ # -------- FUNCTIONS -------- def extract_frames(video_path, fps=2): vr = VideoReader(video_path) total_frames = len(vr) # 原始视频总帧数 native_fps = vr.get_avg_fps() step = max(int(native_fps / fps), 1) idxs = list(range(0, total_frames, step)) # 记录采样到的是哪些帧 frames = vr.get_batch(idxs).asnumpy() return frames, total_frames, idxs # 返回3项 def encode_image(image_array): from PIL import Image import io img = Image.fromarray(image_array) buf = io.BytesIO() img.save(buf, format="JPEG") return base64.b64encode(buf.getvalue()).decode("utf-8") def call_model(frames): print(f"frames length: {len(frames)}") imgs = [ { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{encode_image(f)}" } } for f in frames ] response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": PROMPT}, {"role": "user", "content": imgs} ] ) text = response.choices[0].message.content return json.loads(text) # -------- MAIN LOOP -------- os.makedirs(Path(OUTPUT_FILE).parent, exist_ok=True) with open(OUTPUT_FILE, "w") as fout: for vid in tqdm(sorted(Path(VIDEO_DIR).glob("*.mp4"))): try: frames, total_frames, idxs = extract_frames(str(vid), FPS_SAMPLE) result = call_model(frames) for i, step_data in enumerate(result): entry = { "video_id": vid.stem, "t": i, "frame_index": int(idxs[i]), # ← 当前 step 对应原始帧编号 "total_frames": int(total_frames), # ← 原始视频的总帧数 **step_data } fout.write(json.dumps(entry) + "\n") except Exception as e: print(f"[ERROR] {vid}: {e}") # def extract_frames(video_path, fps=2): # vr = VideoReader(video_path) # native_fps = vr.get_avg_fps() # step = max(int(native_fps / fps), 1) # idxs = list(range(0, len(vr), step)) # frames = vr.get_batch(idxs).asnumpy() # return frames # shape (T, H, W, 3) # def encode_image(image_array): # from PIL import Image # import io # img = Image.fromarray(image_array) # buf = io.BytesIO() # img.save(buf, format="JPEG") # return base64.b64encode(buf.getvalue()).decode("utf-8") # def call_model(frames): # # ✅ 修正这里为最新格式 # print(f"frames length: {len(frames)}") # imgs = [ # { # "type": "image_url", # "image_url": { # "url": f"data:image/jpeg;base64,{encode_image(f)}" # } # } # for f in frames # ] # response = client.chat.completions.create( # model=MODEL_NAME, # messages=[ # {"role": "system", "content": PROMPT}, # {"role": "user", "content": imgs} # ] # ) # text = response.choices[0].message.content # return json.loads(text) # # -------- MAIN LOOP -------- # os.makedirs(Path(OUTPUT_FILE).parent, exist_ok=True) # with open(OUTPUT_FILE, "w") as fout: # for vid in tqdm(sorted(Path(VIDEO_DIR).glob("*.mp4"))): # try: # frames = extract_frames(str(vid), FPS_SAMPLE) # result = call_model(frames) # for i, step_data in enumerate(result): # entry = { # "video_id": vid.stem, # "t": i, # **step_data # } # fout.write(json.dumps(entry) + "\n") # except Exception as e: # print(f"[ERROR] {vid}: {e}")