"""ARC-AGI-2 Video Answer Evaluator. Extracts the test output grid from the last frame of a generated video, then compares it against the ground-truth answer. Color recovery pipeline: 1. Match pixel RGB against the canonical ARC_COLORS palette → permuted color index 2. Apply inverse permutation → original color index 3. Compare with ground truth Usage: python video_evaluate.py --video_dir videos --data_dir data --output results.json """ import json import random import argparse from pathlib import Path from collections import defaultdict import cv2 import numpy as np from tqdm import tqdm # ── ARC Color Palette (RGB) ─────────────────────────────────────────────────── ARC_COLORS = np.array([ [0x00, 0x00, 0x00], # 0: black [0x00, 0x74, 0xD9], # 1: blue [0xFF, 0x41, 0x36], # 2: red [0x2E, 0xCC, 0x40], # 3: green [0xFF, 0xDC, 0x00], # 4: yellow [0xAA, 0xAA, 0xAA], # 5: grey [0xF0, 0x12, 0xBE], # 6: magenta [0xFF, 0x85, 0x1B], # 7: orange [0x7F, 0xDB, 0xFF], # 8: light blue [0x87, 0x0C, 0x25], # 9: maroon ], dtype=np.uint8) # ── Color Permutation Utilities ──────────────────────────────────────────────── def generate_color_permutation(seed: int) -> list[int]: """Reproduce the same permutation used during video generation.""" rng = random.Random(seed) perm = list(range(10)) rng.shuffle(perm) return perm def invert_permutation(perm: list[int]) -> list[int]: """Compute inverse permutation: inv[perm[i]] = i.""" inv = [0] * len(perm) for i, p in enumerate(perm): inv[p] = i return inv # ── Layout Computation (mirrors video_generate.py exactly) ───────────────────── def compute_test_output_bbox(task: dict, canvas_h: int, canvas_w: int) -> dict: """Compute pixel bounding box of the test output grid region. Replicates _compute_layout + render_frame positioning from video_generate.py. """ n_cols = len(task["train"]) + 1 n_rows = 2 padding = 12 outer_margin = 16 label_h = 20 usable_w = canvas_w - 2 * outer_margin - (n_cols - 1) * padding usable_h = canvas_h - 2 * outer_margin - (n_rows - 1) * padding cell_w = usable_w // n_cols cell_h = usable_h // n_rows total_block_w = cell_w * n_cols + (n_cols - 1) * padding total_block_h = cell_h * n_rows + (n_rows - 1) * padding margin_x = (canvas_w - total_block_w) // 2 margin_y = (canvas_h - total_block_h) // 2 # Test output: last column, second row col = n_cols - 1 x0 = margin_x + col * (cell_w + padding) y0 = margin_y + cell_h + padding test_out = np.array(task["test"][0]["output"]) gr, gc = test_out.shape return { "grid_rows": gr, "grid_cols": gc, "grid_x0": x0, "grid_y0": y0 + label_h, "grid_w": cell_w, "grid_h": cell_h - label_h, } # ── Frame Extraction ─────────────────────────────────────────────────────────── def extract_last_frame(video_path: str) -> np.ndarray: """Extract the last frame from a video as an RGB numpy array.""" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise FileNotFoundError(f"Cannot open video: {video_path}") total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.set(cv2.CAP_PROP_POS_FRAMES, max(0, total - 1)) ret, frame = cap.read() cap.release() if not ret: raise RuntimeError(f"Failed to read last frame from {video_path}") return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # ── Grid Extraction ──────────────────────────────────────────────────────────── def extract_grid_from_frame( frame: np.ndarray, grid_x0: int, grid_y0: int, grid_w: int, grid_h: int, grid_rows: int, grid_cols: int, ) -> list[list[int]]: """Extract ARC grid by sampling cell centers and matching to ARC_COLORS. Always matches against the canonical ARC_COLORS palette. The returned indices are the permuted color values as rendered in the video. Args: frame: RGB image (H, W, 3). grid_x0, grid_y0: Top-left of grid area (below label). grid_w, grid_h: Grid area dimensions. grid_rows, grid_cols: Expected grid shape. Returns: Grid of permuted color indices (apply inverse perm to get originals). """ cell_h = grid_h / grid_rows cell_w = grid_w / grid_cols grid = [] for r in range(grid_rows): row = [] cy = int(grid_y0 + (r + 0.5) * cell_h) for c in range(grid_cols): cx = int(grid_x0 + (c + 0.5) * cell_w) # 3x3 patch average for codec artifact robustness patch = frame[max(0, cy - 1): cy + 2, max(0, cx - 1): cx + 2] avg = patch.mean(axis=(0, 1)).astype(np.uint8) dists = np.sum((ARC_COLORS.astype(int) - avg.astype(int)) ** 2, axis=1) row.append(int(np.argmin(dists))) grid.append(row) return grid # ── Evaluation ───────────────────────────────────────────────────────────────── def evaluate_video( video_path: str, task: dict, perm: list[int], canvas_h: int = 720, canvas_w: int = 1280, ) -> dict: """Evaluate a single video against ground truth. Pipeline: 1. Extract last frame (full answer revealed) 2. Locate test output region via layout math 3. Sample cell centers → match to ARC_COLORS → get permuted color indices 4. Apply inverse permutation → recover original color indices 5. Compare with ground truth Returns: Dict with 'correct', 'predicted_grid', 'ground_truth', 'pixel_accuracy'. """ frame = extract_last_frame(video_path) bbox = compute_test_output_bbox(task, canvas_h, canvas_w) # Step 1: extract permuted color indices from rendered pixels permuted_grid = extract_grid_from_frame(frame, **bbox) # Step 2: invert permutation to recover original values inv = invert_permutation(perm) predicted = [[inv[cell] for cell in row] for row in permuted_grid] # Step 3: compare with ground truth gt = task["test"][0]["output"] correct = (predicted == gt) gt_flat = [c for row in gt for c in row] pred_flat = [c for row in predicted for c in row] n_match = sum(a == b for a, b in zip(gt_flat, pred_flat)) pixel_acc = n_match / max(len(gt_flat), 1) return { "correct": correct, "predicted_grid": predicted, "ground_truth": gt, "pixel_accuracy": pixel_acc, } # ── Batch Evaluation ─────────────────────────────────────────────────────────── def evaluate_all( video_dir: str = "videos", data_dir: str = "data", output_file: str = "results.json", ) -> None: """Evaluate all videos against ground-truth tasks. Recovers the color permutation from the seed in the filename ({task_id}_{seed}.mp4) using the same RNG as video_generate.py. """ video_path = Path(video_dir) data_path = Path(data_dir) # Build task file lookup task_files: dict[str, Path] = {} for subdir in ["training", "evaluation"]: d = data_path / subdir if d.exists(): for fp in d.glob("*.json"): task_files[fp.stem] = fp videos = sorted(video_path.glob("*.mp4")) if not videos: print(f"No videos found in {video_dir}") return # Auto-detect resolution from first video cap = cv2.VideoCapture(str(videos[0])) canvas_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) canvas_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() print(f"Detected resolution: {canvas_h}x{canvas_w}") results = {} total_correct = 0 total_count = 0 for vp in tqdm(videos, desc="Evaluating"): stem = vp.stem parts = stem.rsplit("_", 1) if len(parts) != 2: continue task_id, seed_str = parts if task_id not in task_files: tqdm.write(f"Skip {stem}: task not found") continue with open(task_files[task_id]) as f: task = json.load(f) if not task.get("test") or "output" not in task["test"][0]: continue # Recover the exact permutation from seed seed = int(seed_str) perm = generate_color_permutation(seed) try: result = evaluate_video(str(vp), task, perm, canvas_h, canvas_w) results[stem] = { "correct": result["correct"], "pixel_accuracy": result["pixel_accuracy"], "task_id": task_id, "seed": seed_str, } total_count += 1 if result["correct"]: total_correct += 1 except Exception as e: tqdm.write(f"Error {stem}: {e}") results[stem] = {"error": str(e), "task_id": task_id} acc = total_correct / max(total_count, 1) # Per-task pixel accuracy aggregation task_pixels: dict[str, list[float]] = defaultdict(list) for v in results.values(): if "pixel_accuracy" in v: task_pixels[v["task_id"]].append(v["pixel_accuracy"]) per_task_pixel_acc = { tid: round(sum(accs) / len(accs), 4) for tid, accs in sorted(task_pixels.items()) } summary = { "total_videos": total_count, "correct": total_correct, "accuracy": round(acc, 4), "mean_pixel_accuracy": round( sum(per_task_pixel_acc.values()) / max(len(per_task_pixel_acc), 1), 4 ), "per_task_pixel_accuracy": per_task_pixel_acc, "results": results, } with open(output_file, "w") as f: json.dump(summary, f, indent=2) print(f"\nResults: {total_correct}/{total_count} correct ({acc:.2%})") print(f"Mean pixel accuracy (per-task avg): {summary['mean_pixel_accuracy']:.2%}") print(f"Saved to {output_file}") # ── CLI ──────────────────────────────────────────────────────────────────────── if __name__ == "__main__": p = argparse.ArgumentParser(description="ARC Video Evaluator") p.add_argument("--video_dir", type=str, default="videos") p.add_argument("--data_dir", type=str, default="data") p.add_argument("--output", type=str, default="results.json") args = p.parse_args() evaluate_all(args.video_dir, args.data_dir, args.output)