| """ARC-AGI-2 Video Answer Evaluator. |
| |
| Extracts the test output grid from the last frame of a generated video, |
| then compares it against the ground-truth answer. |
| |
| Color recovery pipeline: |
| 1. Match pixel RGB against the canonical ARC_COLORS palette β permuted color index |
| 2. Apply inverse permutation β original color index |
| 3. Compare with ground truth |
| |
| Usage: |
| python video_evaluate.py --video_dir videos --data_dir data --output results.json |
| """ |
|
|
| import json |
| import random |
| import argparse |
| from pathlib import Path |
|
|
| from collections import defaultdict |
| import cv2 |
| import numpy as np |
| from tqdm import tqdm |
|
|
| |
|
|
| ARC_COLORS = np.array([ |
| [0x00, 0x00, 0x00], |
| [0x00, 0x74, 0xD9], |
| [0xFF, 0x41, 0x36], |
| [0x2E, 0xCC, 0x40], |
| [0xFF, 0xDC, 0x00], |
| [0xAA, 0xAA, 0xAA], |
| [0xF0, 0x12, 0xBE], |
| [0xFF, 0x85, 0x1B], |
| [0x7F, 0xDB, 0xFF], |
| [0x87, 0x0C, 0x25], |
| ], dtype=np.uint8) |
|
|
|
|
| |
|
|
| def generate_color_permutation(seed: int) -> list[int]: |
| """Reproduce the same permutation used during video generation.""" |
| rng = random.Random(seed) |
| perm = list(range(10)) |
| rng.shuffle(perm) |
| return perm |
|
|
|
|
| def invert_permutation(perm: list[int]) -> list[int]: |
| """Compute inverse permutation: inv[perm[i]] = i.""" |
| inv = [0] * len(perm) |
| for i, p in enumerate(perm): |
| inv[p] = i |
| return inv |
|
|
|
|
| |
|
|
| def compute_test_output_bbox(task: dict, canvas_h: int, canvas_w: int) -> dict: |
| """Compute pixel bounding box of the test output grid region. |
| |
| Replicates _compute_layout + render_frame positioning from video_generate.py. |
| """ |
| n_cols = len(task["train"]) + 1 |
| n_rows = 2 |
| padding = 12 |
| outer_margin = 16 |
| label_h = 20 |
|
|
| usable_w = canvas_w - 2 * outer_margin - (n_cols - 1) * padding |
| usable_h = canvas_h - 2 * outer_margin - (n_rows - 1) * padding |
| cell_w = usable_w // n_cols |
| cell_h = usable_h // n_rows |
|
|
| total_block_w = cell_w * n_cols + (n_cols - 1) * padding |
| total_block_h = cell_h * n_rows + (n_rows - 1) * padding |
| margin_x = (canvas_w - total_block_w) // 2 |
| margin_y = (canvas_h - total_block_h) // 2 |
|
|
| |
| col = n_cols - 1 |
| x0 = margin_x + col * (cell_w + padding) |
| y0 = margin_y + cell_h + padding |
|
|
| test_out = np.array(task["test"][0]["output"]) |
| gr, gc = test_out.shape |
|
|
| return { |
| "grid_rows": gr, |
| "grid_cols": gc, |
| "grid_x0": x0, |
| "grid_y0": y0 + label_h, |
| "grid_w": cell_w, |
| "grid_h": cell_h - label_h, |
| } |
|
|
|
|
| |
|
|
| def extract_last_frame(video_path: str) -> np.ndarray: |
| """Extract the last frame from a video as an RGB numpy array.""" |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| raise FileNotFoundError(f"Cannot open video: {video_path}") |
|
|
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| cap.set(cv2.CAP_PROP_POS_FRAMES, max(0, total - 1)) |
| ret, frame = cap.read() |
| cap.release() |
|
|
| if not ret: |
| raise RuntimeError(f"Failed to read last frame from {video_path}") |
| return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
| |
|
|
| def extract_grid_from_frame( |
| frame: np.ndarray, |
| grid_x0: int, |
| grid_y0: int, |
| grid_w: int, |
| grid_h: int, |
| grid_rows: int, |
| grid_cols: int, |
| ) -> list[list[int]]: |
| """Extract ARC grid by sampling cell centers and matching to ARC_COLORS. |
| |
| Always matches against the canonical ARC_COLORS palette. The returned |
| indices are the permuted color values as rendered in the video. |
| |
| Args: |
| frame: RGB image (H, W, 3). |
| grid_x0, grid_y0: Top-left of grid area (below label). |
| grid_w, grid_h: Grid area dimensions. |
| grid_rows, grid_cols: Expected grid shape. |
| |
| Returns: |
| Grid of permuted color indices (apply inverse perm to get originals). |
| """ |
| cell_h = grid_h / grid_rows |
| cell_w = grid_w / grid_cols |
|
|
| grid = [] |
| for r in range(grid_rows): |
| row = [] |
| cy = int(grid_y0 + (r + 0.5) * cell_h) |
| for c in range(grid_cols): |
| cx = int(grid_x0 + (c + 0.5) * cell_w) |
| |
| patch = frame[max(0, cy - 1): cy + 2, max(0, cx - 1): cx + 2] |
| avg = patch.mean(axis=(0, 1)).astype(np.uint8) |
| dists = np.sum((ARC_COLORS.astype(int) - avg.astype(int)) ** 2, axis=1) |
| row.append(int(np.argmin(dists))) |
| grid.append(row) |
| return grid |
|
|
|
|
| |
|
|
| def evaluate_video( |
| video_path: str, |
| task: dict, |
| perm: list[int], |
| canvas_h: int = 720, |
| canvas_w: int = 1280, |
| ) -> dict: |
| """Evaluate a single video against ground truth. |
| |
| Pipeline: |
| 1. Extract last frame (full answer revealed) |
| 2. Locate test output region via layout math |
| 3. Sample cell centers β match to ARC_COLORS β get permuted color indices |
| 4. Apply inverse permutation β recover original color indices |
| 5. Compare with ground truth |
| |
| Returns: |
| Dict with 'correct', 'predicted_grid', 'ground_truth', 'pixel_accuracy'. |
| """ |
| frame = extract_last_frame(video_path) |
| bbox = compute_test_output_bbox(task, canvas_h, canvas_w) |
|
|
| |
| permuted_grid = extract_grid_from_frame(frame, **bbox) |
|
|
| |
| inv = invert_permutation(perm) |
| predicted = [[inv[cell] for cell in row] for row in permuted_grid] |
|
|
| |
| gt = task["test"][0]["output"] |
| correct = (predicted == gt) |
|
|
| gt_flat = [c for row in gt for c in row] |
| pred_flat = [c for row in predicted for c in row] |
| n_match = sum(a == b for a, b in zip(gt_flat, pred_flat)) |
| pixel_acc = n_match / max(len(gt_flat), 1) |
|
|
| return { |
| "correct": correct, |
| "predicted_grid": predicted, |
| "ground_truth": gt, |
| "pixel_accuracy": pixel_acc, |
| } |
|
|
|
|
| |
|
|
| def evaluate_all( |
| video_dir: str = "videos", |
| data_dir: str = "data", |
| output_file: str = "results.json", |
| ) -> None: |
| """Evaluate all videos against ground-truth tasks. |
| |
| Recovers the color permutation from the seed in the filename |
| ({task_id}_{seed}.mp4) using the same RNG as video_generate.py. |
| """ |
| video_path = Path(video_dir) |
| data_path = Path(data_dir) |
|
|
| |
| task_files: dict[str, Path] = {} |
| for subdir in ["training", "evaluation"]: |
| d = data_path / subdir |
| if d.exists(): |
| for fp in d.glob("*.json"): |
| task_files[fp.stem] = fp |
|
|
| videos = sorted(video_path.glob("*.mp4")) |
| if not videos: |
| print(f"No videos found in {video_dir}") |
| return |
|
|
| |
| cap = cv2.VideoCapture(str(videos[0])) |
| canvas_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| canvas_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| cap.release() |
| print(f"Detected resolution: {canvas_h}x{canvas_w}") |
|
|
| results = {} |
| total_correct = 0 |
| total_count = 0 |
|
|
| for vp in tqdm(videos, desc="Evaluating"): |
| stem = vp.stem |
| parts = stem.rsplit("_", 1) |
| if len(parts) != 2: |
| continue |
| task_id, seed_str = parts |
|
|
| if task_id not in task_files: |
| tqdm.write(f"Skip {stem}: task not found") |
| continue |
|
|
| with open(task_files[task_id]) as f: |
| task = json.load(f) |
|
|
| if not task.get("test") or "output" not in task["test"][0]: |
| continue |
|
|
| |
| seed = int(seed_str) |
| perm = generate_color_permutation(seed) |
|
|
| try: |
| result = evaluate_video(str(vp), task, perm, canvas_h, canvas_w) |
| results[stem] = { |
| "correct": result["correct"], |
| "pixel_accuracy": result["pixel_accuracy"], |
| "task_id": task_id, |
| "seed": seed_str, |
| } |
| total_count += 1 |
| if result["correct"]: |
| total_correct += 1 |
| except Exception as e: |
| tqdm.write(f"Error {stem}: {e}") |
| results[stem] = {"error": str(e), "task_id": task_id} |
|
|
| acc = total_correct / max(total_count, 1) |
|
|
| |
| task_pixels: dict[str, list[float]] = defaultdict(list) |
| for v in results.values(): |
| if "pixel_accuracy" in v: |
| task_pixels[v["task_id"]].append(v["pixel_accuracy"]) |
|
|
| per_task_pixel_acc = { |
| tid: round(sum(accs) / len(accs), 4) |
| for tid, accs in sorted(task_pixels.items()) |
| } |
|
|
| summary = { |
| "total_videos": total_count, |
| "correct": total_correct, |
| "accuracy": round(acc, 4), |
| "mean_pixel_accuracy": round( |
| sum(per_task_pixel_acc.values()) / max(len(per_task_pixel_acc), 1), 4 |
| ), |
| "per_task_pixel_accuracy": per_task_pixel_acc, |
| "results": results, |
| } |
|
|
| with open(output_file, "w") as f: |
| json.dump(summary, f, indent=2) |
|
|
| print(f"\nResults: {total_correct}/{total_count} correct ({acc:.2%})") |
| print(f"Mean pixel accuracy (per-task avg): {summary['mean_pixel_accuracy']:.2%}") |
| print(f"Saved to {output_file}") |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| p = argparse.ArgumentParser(description="ARC Video Evaluator") |
| p.add_argument("--video_dir", type=str, default="videos") |
| p.add_argument("--data_dir", type=str, default="data") |
| p.add_argument("--output", type=str, default="results.json") |
| args = p.parse_args() |
| evaluate_all(args.video_dir, args.data_dir, args.output) |