"""ARC-AGI-2 Task Video Generator. Generates animated videos for ARC tasks that progressively reveal test outputs. Supports random color permutation for data augmentation. Renders directly to a target resolution with auto-calculated grid layout. Outputs train.jsonl / test.jsonl with stratified splits. Usage: python video_generate.py --data_dir data --output_dir videos \ --n_frames 5 --m_frames 5 --k_rate 1.0 \ --repeat_num 3 --max_frames None --fps 15 \ --resolution 720 1280 --train_ratio 0.9 """ import json import csv import argparse import random import math from pathlib import Path from tqdm import tqdm import cv2 import numpy as np # ── ARC Color Palette (RGB) ─────────────────────────────────────────────────── ARC_COLORS = np.array([ [0x00, 0x00, 0x00], # 0: black [0x00, 0x74, 0xD9], # 1: blue [0xFF, 0x41, 0x36], # 2: red [0x2E, 0xCC, 0x40], # 3: green [0xFF, 0xDC, 0x00], # 4: yellow [0xAA, 0xAA, 0xAA], # 5: grey [0xF0, 0x12, 0xBE], # 6: magenta [0xFF, 0x85, 0x1B], # 7: orange [0x7F, 0xDB, 0xFF], # 8: light blue [0x87, 0x0C, 0x25], # 9: maroon ], dtype=np.uint8) GRID_LINE_COLOR = (200, 200, 200) LABEL_COLOR = (40, 40, 40) BG_COLOR = (255, 255, 255) UNREVEALED_COLOR = np.array([220, 220, 220], dtype=np.uint8) # ── Color Permutation ────────────────────────────────────────────────────────── def generate_color_permutation(seed: int) -> list[int]: """Generate a deterministic color permutation from a seed.""" rng = random.Random(seed) perm = list(range(10)) rng.shuffle(perm) return perm def apply_color_permutation(grid: list[list[int]], perm: list[int]) -> list[list[int]]: """Apply color permutation to a grid (nested list).""" return [[perm[cell] for cell in row] for row in grid] def permute_task(task: dict, perm: list[int]) -> dict: """Return a deep-copied task with all grids color-permuted.""" new_task = {"train": [], "test": []} for pair in task["train"]: new_task["train"].append({ "input": apply_color_permutation(pair["input"], perm), "output": apply_color_permutation(pair["output"], perm), }) for pair in task["test"]: new_pair = {"input": apply_color_permutation(pair["input"], perm)} if "output" in pair: new_pair["output"] = apply_color_permutation(pair["output"], perm) new_task["test"].append(new_pair) return new_task # ── Direct Canvas Grid Rendering ─────────────────────────────────────────────── def _render_grid_to_region( canvas: np.ndarray, grid: np.ndarray, x0: int, y0: int, w: int, h: int, label: str, rows_revealed: int | None = None, ) -> None: """Render a single ARC grid into a rectangular region of the canvas.""" label_h = 20 grid_y0 = y0 + label_h grid_h = h - label_h grid_w = w if grid_h <= 0 or grid_w <= 0: return gr, gc = grid.shape cell_h = grid_h / gr cell_w = grid_w / gc for r in range(gr): for c in range(gc): cy = int(grid_y0 + r * cell_h) cx = int(x0 + c * cell_w) cy2 = int(grid_y0 + (r + 1) * cell_h) cx2 = int(x0 + (c + 1) * cell_w) if rows_revealed is not None and r >= rows_revealed: color = tuple(UNREVEALED_COLOR.tolist()) else: color = tuple(ARC_COLORS[grid[r, c]].tolist()) cv2.rectangle(canvas, (cx, cy), (cx2, cy2), color, -1) for r in range(gr + 1): ly = int(grid_y0 + r * cell_h) cv2.line(canvas, (x0, ly), (x0 + grid_w, ly), GRID_LINE_COLOR, 1) for c in range(gc + 1): lx = int(x0 + c * cell_w) cv2.line(canvas, (lx, grid_y0), (lx, grid_y0 + grid_h), GRID_LINE_COLOR, 1) font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.8 thickness = 1 (tw, th), _ = cv2.getTextSize(label, font, font_scale, thickness) tx = x0 + (w - tw) // 2 ty = y0 + label_h - 4 cv2.putText(canvas, label, (tx, ty), font, font_scale, LABEL_COLOR, thickness, cv2.LINE_AA) # ── Layout Calculation ───────────────────────────────────────────────────────── def _compute_layout(task: dict, canvas_h: int, canvas_w: int) -> dict: """Compute uniform grid layout for all pairs on the canvas.""" n_cols = len(task["train"]) + 1 n_rows = 2 padding = 12 outer_margin = 16 label_h = 20 usable_w = canvas_w - 2 * outer_margin - (n_cols - 1) * padding usable_h = canvas_h - 2 * outer_margin - (n_rows - 1) * padding cell_w = usable_w // n_cols cell_h = usable_h // n_rows total_block_w = cell_w * n_cols + (n_cols - 1) * padding total_block_h = cell_h * n_rows + (n_rows - 1) * padding margin_x = (canvas_w - total_block_w) // 2 margin_y = (canvas_h - total_block_h) // 2 return { "n_cols": n_cols, "n_rows": n_rows, "cell_w": cell_w, "cell_h": cell_h, "margin_x": margin_x, "margin_y": margin_y, "padding": padding, "label_h": label_h, } # ── Frame Rendering ──────────────────────────────────────────────────────────── def render_frame( task: dict, test_idx: int, rows_revealed: int | None, canvas_h: int = 720, canvas_w: int = 1280, ) -> np.ndarray: """Render one video frame as an RGB numpy array.""" canvas = np.full((canvas_h, canvas_w, 3), BG_COLOR, dtype=np.uint8) layout = _compute_layout(task, canvas_h, canvas_w) n_cols = layout["n_cols"] cell_w, cell_h = layout["cell_w"], layout["cell_h"] mx, my, pad = layout["margin_x"], layout["margin_y"], layout["padding"] train_pairs = task["train"] test_pair = task["test"][test_idx] for col in range(n_cols): x0 = mx + col * (cell_w + pad) if col < len(train_pairs): inp = np.array(train_pairs[col]["input"]) out = np.array(train_pairs[col]["output"]) _render_grid_to_region(canvas, inp, x0, my, cell_w, cell_h, f"Train {col+1} In") y1 = my + cell_h + pad _render_grid_to_region(canvas, out, x0, y1, cell_w, cell_h, f"Train {col+1} Out") else: test_in = np.array(test_pair["input"]) _render_grid_to_region(canvas, test_in, x0, my, cell_w, cell_h, "Test In") test_out = np.array(test_pair["output"]) y1 = my + cell_h + pad reveal = 0 if rows_revealed is None else rows_revealed _render_grid_to_region(canvas, test_out, x0, y1, cell_w, cell_h, "Test Out", rows_revealed=reveal) return canvas # ── Video Generation ─────────────────────────────────────────────────────────── def generate_video( task: dict, output_path: str, n_frames: int = 5, m_frames: int = 5, k_rate: float = 1.0, max_frames: int | None = None, fps: int = 15, canvas_h: int = 720, canvas_w: int = 1280, ) -> int: """Generate a single ARC task video. Returns total frame count.""" test_out = np.array(task["test"][0]["output"]) total_rows = test_out.shape[0] reveal_frames_natural = int(math.ceil(total_rows * k_rate)) total_natural = n_frames + reveal_frames_natural + m_frames if max_frames is not None and total_natural > max_frames: available_reveal = max(1, max_frames - n_frames - m_frames) effective_k = available_reveal / total_rows reveal_frames = available_reveal else: effective_k = k_rate reveal_frames = reveal_frames_natural total_frames = n_frames + reveal_frames + m_frames h = canvas_h if canvas_h % 2 == 0 else canvas_h + 1 w = canvas_w if canvas_w % 2 == 0 else canvas_w + 1 Path(output_path).parent.mkdir(parents=True, exist_ok=True) fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(output_path, fourcc, fps, (w, h)) def _write(frame_rgb: np.ndarray) -> None: writer.write(cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)) pbar = tqdm(total=total_frames, desc=" Frames", leave=False, unit="f") # Phase 1: Placeholder placeholder = render_frame(task, 0, None, h, w) for _ in range(n_frames): _write(placeholder) pbar.update(1) # Phase 2: Progressive reveal if effective_k >= 1: frames_per_row = effective_k row_cursor = 0 accumulated = 0.0 for _ in range(reveal_frames): accumulated += 1.0 if accumulated >= frames_per_row and row_cursor < total_rows: row_cursor += 1 accumulated -= frames_per_row _write(render_frame(task, 0, row_cursor, h, w)) pbar.update(1) else: rows_per_frame = 1.0 / effective_k row_accum = 0.0 for _ in range(reveal_frames): row_accum += rows_per_frame rows_shown = min(int(math.ceil(row_accum)), total_rows) _write(render_frame(task, 0, rows_shown, h, w)) pbar.update(1) # Phase 3: Full answer full = render_frame(task, 0, total_rows, h, w) for _ in range(m_frames): _write(full) pbar.update(1) pbar.close() writer.release() return total_frames # ── Metadata Cache ───────────────────────────────────────────────────────────── METADATA_FILE = ".metadata.json" def _build_params_dict( data_dir: str, n_frames: int, m_frames: int, k_rate: float, max_frames: int | None, fps: int, repeat_num: int, canvas_h: int, canvas_w: int, ) -> dict: """Build a JSON-serializable dict of generation parameters.""" return { "data_dir": str(Path(data_dir).resolve()), "n_frames": n_frames, "m_frames": m_frames, "k_rate": k_rate, "max_frames": max_frames, "fps": fps, "repeat_num": repeat_num, "canvas_h": canvas_h, "canvas_w": canvas_w, } def _load_metadata(out_path: Path) -> dict | None: meta_path = out_path / METADATA_FILE if not meta_path.exists(): return None try: with open(meta_path) as f: return json.load(f) except (json.JSONDecodeError, OSError): return None def _save_metadata(out_path: Path, params: dict, completed: set[str]) -> None: meta = {"params": params, "completed": sorted(completed)} tmp_path = (out_path / METADATA_FILE).with_suffix(".tmp") with open(tmp_path, "w") as f: json.dump(meta, f, indent=2) tmp_path.replace(out_path / METADATA_FILE) def _clear_output_dir(out_path: Path) -> None: if out_path.exists(): for mp4 in out_path.glob("*.mp4"): mp4.unlink() meta = out_path / METADATA_FILE if meta.exists(): meta.unlink() def compute_test_output_bbox(task: dict, canvas_h: int, canvas_w: int) -> dict: """Compute the pixel bounding box of the test output cell.""" n_cols = len(task["train"]) + 1 n_rows = 2 padding = 12 outer_margin = 16 label_h = 20 usable_w = canvas_w - 2 * outer_margin - (n_cols - 1) * padding usable_h = canvas_h - 2 * outer_margin - (n_rows - 1) * padding cell_w = usable_w // n_cols cell_h = usable_h // n_rows total_block_w = cell_w * n_cols + (n_cols - 1) * padding total_block_h = cell_h * n_rows + (n_rows - 1) * padding margin_x = (canvas_w - total_block_w) // 2 margin_y = (canvas_h - total_block_h) // 2 col = n_cols - 1 x0 = margin_x + col * (cell_w + padding) y0 = margin_y + cell_h + padding test_out = np.array(task["test"][0]["output"]) gr, gc = test_out.shape return { "grid_rows": gr, "grid_cols": gc, "x0": x0, "y0": y0, "grid_x0": x0, "grid_y0": y0 + label_h, "grid_w": cell_w, "grid_h": cell_h - label_h, "cell_w": cell_w, "cell_h": cell_h, } def save_video_metadata( task: dict, perm: list[int], seed: int, canvas_h: int, canvas_w: int, meta_path: str, ) -> None: """Save per-video metadata JSON for evaluation.""" bbox = compute_test_output_bbox(task, canvas_h, canvas_w) permuted_palette = ARC_COLORS[perm].tolist() meta = { "seed": seed, "color_perm": perm, "permuted_palette": permuted_palette, "canvas_h": canvas_h, "canvas_w": canvas_w, **bbox, } Path(meta_path).parent.mkdir(parents=True, exist_ok=True) with open(meta_path, "w") as f: json.dump(meta, f, indent=2) # ── Train/Test Split ─────────────────────────────────────────────────────────── def _write_splits( all_samples: list[dict], out_path: Path, train_ratio: float, ) -> None: """Stratified train/test split by source, write JSONL and CSV files.""" rng = random.Random(42) by_source: dict[str, list[dict]] = {} for s in all_samples: by_source.setdefault(s["source"], []).append(s) train_samples, test_samples = [], [] for source in sorted(by_source): group = by_source[source] rng.shuffle(group) split_idx = int(len(group) * train_ratio) train_samples.extend(group[:split_idx]) test_samples.extend(group[split_idx:]) rng.shuffle(train_samples) rng.shuffle(test_samples) # JSONL for name, samples in [("train", train_samples), ("test", test_samples)]: with open(out_path / f"{name}.jsonl", "w") as f: for s in samples: f.write(json.dumps(s) + "\n") # CSV for name, samples in [("train", train_samples), ("test", test_samples)]: with open(out_path / f"{name}.csv", "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(["video", "meta", "task_id", "source", "prompt"]) for s in samples: writer.writerow([s["video"], s["meta"], s["task_id"], s["source"], s["prompt"]]) tqdm.write(f" Split: {len(train_samples)} train / {len(test_samples)} test") tqdm.write(f" Written: train.jsonl, test.jsonl, train.csv, test.csv") # ── Batch Processing ─────────────────────────────────────────────────────────── def process_all( data_dir: str = "data", output_dir: str = "videos", n_frames: int = 5, m_frames: int = 5, k_rate: float = 1.0, max_frames: int | None = None, fps: int = 15, repeat_num: int = 3, canvas_h: int = 720, canvas_w: int = 1280, train_ratio: float = 0.9, prompt: str = "Predict the test output grid based on the input-output training examples.", ) -> None: """Generate videos for all ARC tasks with train/test JSONL splits. Supports resumption via metadata cache. After generation, writes stratified train.jsonl / test.jsonl / CSV files. """ data_path = Path(data_dir) out_path = Path(output_dir) out_path.mkdir(parents=True, exist_ok=True) current_params = _build_params_dict( data_dir, n_frames, m_frames, k_rate, max_frames, fps, repeat_num, canvas_h, canvas_w, ) existing_meta = _load_metadata(out_path) if existing_meta is not None and existing_meta.get("params") == current_params: completed: set[str] = { name for name in existing_meta.get("completed", []) if (out_path / name).exists() } tqdm.write(f"Resuming: {len(completed)} videos already completed.") else: if existing_meta is not None: tqdm.write("Parameters changed — clearing and restarting.") _clear_output_dir(out_path) completed = set() _save_metadata(out_path, current_params, completed) task_files = sorted( list((data_path / "training").glob("*.json")) + list((data_path / "evaluation").glob("*.json")) ) if not task_files: print(f"No task files found in {data_path}/training or {data_path}/evaluation") return total = len(task_files) * repeat_num pbar = tqdm(total=total, desc="Tasks", unit="vid", initial=len(completed)) save_every = 20 new_since_save = 0 all_samples: list[dict] = [] for fpath in task_files: task_id = fpath.stem source = fpath.parent.name # "training" or "evaluation" with open(fpath) as f: task_raw = json.load(f) if not task_raw.get("test") or "output" not in task_raw["test"][0]: pbar.update(repeat_num) continue test_out_arr = np.array(task_raw["test"][0]["output"]) grid_rows, grid_cols = test_out_arr.shape used_perms: set[tuple[int, ...]] = set() seed = 0 generated = 0 while generated < repeat_num: perm = generate_color_permutation(seed) perm_key = tuple(perm) if perm_key not in used_perms: used_perms.add(perm_key) video_name = f"{task_id}_{seed}.mp4" meta_name = f"{task_id}_{seed}.meta.json" sample_meta = { "task_id": task_id, "source": source, "seed": seed, "video": video_name, "meta": meta_name, "prompt": prompt, "grid_rows": int(grid_rows), "grid_cols": int(grid_cols), "color_perm": perm, "n_train_pairs": len(task_raw["train"]), } if video_name not in completed: permuted_task = permute_task(task_raw, perm) pbar.set_postfix_str(f"{task_id}_{seed}") video_file = str(out_path / video_name) frame_count = generate_video( permuted_task, video_file, n_frames=n_frames, m_frames=m_frames, k_rate=k_rate, max_frames=max_frames, fps=fps, canvas_h=canvas_h, canvas_w=canvas_w, ) sample_meta["frame_count"] = frame_count meta_file = video_file.replace(".mp4", ".meta.json") save_video_metadata( task=permuted_task, perm=perm, seed=seed, canvas_h=canvas_h, canvas_w=canvas_w, meta_path=meta_file, ) completed.add(video_name) pbar.update(1) new_since_save += 1 if new_since_save >= save_every: _save_metadata(out_path, current_params, completed) new_since_save = 0 all_samples.append(sample_meta) generated += 1 seed += 1 if seed > repeat_num + 1000: tqdm.write(f"Warning: could not generate {repeat_num} unique perms for {task_id}") pbar.update(repeat_num - generated) break pbar.close() _save_metadata(out_path, current_params, completed) # Write train/test splits _write_splits(all_samples, out_path, train_ratio) tqdm.write(f"Done. {len(completed)} videos, {len(all_samples)} samples in {out_path}/") # ── CLI ──────────────────────────────────────────────────────────────────────── def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="ARC-AGI-2 Video Generator") p.add_argument("--data_dir", type=str, default="ARC-AGI-2/data") p.add_argument("--output_dir", type=str, default="videos") p.add_argument("--n_frames", type=int, default=5) p.add_argument("--m_frames", type=int, default=5) p.add_argument("--k_rate", type=float, default=1.0) p.add_argument("--max_frames", type=int, default=None) p.add_argument("--fps", type=int, default=15) p.add_argument("--repeat_num", type=int, default=3) p.add_argument("--resolution", type=int, nargs=2, default=[720, 1280], metavar=("H", "W")) p.add_argument("--train_ratio", type=float, default=0.9, help="Train split ratio (default: 0.9)") p.add_argument("--prompt", type=str, default="Predict the test output grid based on the input-output training examples.") return p.parse_args() if __name__ == "__main__": args = parse_args() process_all( data_dir=args.data_dir, output_dir=args.output_dir, n_frames=args.n_frames, m_frames=args.m_frames, k_rate=args.k_rate, max_frames=args.max_frames, fps=args.fps, repeat_num=args.repeat_num, canvas_h=args.resolution[0], canvas_w=args.resolution[1], train_ratio=args.train_ratio, prompt=args.prompt, )