| """ARC-AGI-2 Task Video Generator. |
| |
| Generates animated videos for ARC tasks that progressively reveal test outputs. |
| Supports random color permutation for data augmentation. |
| Renders directly to a target resolution with auto-calculated grid layout. |
| Outputs train.jsonl / test.jsonl with stratified splits. |
| |
| Usage: |
| python video_generate.py --data_dir data --output_dir videos \ |
| --n_frames 5 --m_frames 5 --k_rate 1.0 \ |
| --repeat_num 3 --max_frames None --fps 15 \ |
| --resolution 720 1280 --train_ratio 0.9 |
| """ |
|
|
| import json |
| import csv |
| import argparse |
| import random |
| import math |
| from pathlib import Path |
|
|
| from tqdm import tqdm |
|
|
| import cv2 |
| import numpy as np |
|
|
| |
|
|
| ARC_COLORS = np.array([ |
| [0x00, 0x00, 0x00], |
| [0x00, 0x74, 0xD9], |
| [0xFF, 0x41, 0x36], |
| [0x2E, 0xCC, 0x40], |
| [0xFF, 0xDC, 0x00], |
| [0xAA, 0xAA, 0xAA], |
| [0xF0, 0x12, 0xBE], |
| [0xFF, 0x85, 0x1B], |
| [0x7F, 0xDB, 0xFF], |
| [0x87, 0x0C, 0x25], |
| ], dtype=np.uint8) |
|
|
| GRID_LINE_COLOR = (200, 200, 200) |
| LABEL_COLOR = (40, 40, 40) |
| BG_COLOR = (255, 255, 255) |
| UNREVEALED_COLOR = np.array([220, 220, 220], dtype=np.uint8) |
|
|
|
|
| |
|
|
| def generate_color_permutation(seed: int) -> list[int]: |
| """Generate a deterministic color permutation from a seed.""" |
| rng = random.Random(seed) |
| perm = list(range(10)) |
| rng.shuffle(perm) |
| return perm |
|
|
|
|
| def apply_color_permutation(grid: list[list[int]], perm: list[int]) -> list[list[int]]: |
| """Apply color permutation to a grid (nested list).""" |
| return [[perm[cell] for cell in row] for row in grid] |
|
|
|
|
| def permute_task(task: dict, perm: list[int]) -> dict: |
| """Return a deep-copied task with all grids color-permuted.""" |
| new_task = {"train": [], "test": []} |
| for pair in task["train"]: |
| new_task["train"].append({ |
| "input": apply_color_permutation(pair["input"], perm), |
| "output": apply_color_permutation(pair["output"], perm), |
| }) |
| for pair in task["test"]: |
| new_pair = {"input": apply_color_permutation(pair["input"], perm)} |
| if "output" in pair: |
| new_pair["output"] = apply_color_permutation(pair["output"], perm) |
| new_task["test"].append(new_pair) |
| return new_task |
|
|
|
|
| |
|
|
| def _render_grid_to_region( |
| canvas: np.ndarray, |
| grid: np.ndarray, |
| x0: int, y0: int, w: int, h: int, |
| label: str, |
| rows_revealed: int | None = None, |
| ) -> None: |
| """Render a single ARC grid into a rectangular region of the canvas.""" |
| label_h = 20 |
| grid_y0 = y0 + label_h |
| grid_h = h - label_h |
| grid_w = w |
|
|
| if grid_h <= 0 or grid_w <= 0: |
| return |
|
|
| gr, gc = grid.shape |
| cell_h = grid_h / gr |
| cell_w = grid_w / gc |
|
|
| for r in range(gr): |
| for c in range(gc): |
| cy = int(grid_y0 + r * cell_h) |
| cx = int(x0 + c * cell_w) |
| cy2 = int(grid_y0 + (r + 1) * cell_h) |
| cx2 = int(x0 + (c + 1) * cell_w) |
|
|
| if rows_revealed is not None and r >= rows_revealed: |
| color = tuple(UNREVEALED_COLOR.tolist()) |
| else: |
| color = tuple(ARC_COLORS[grid[r, c]].tolist()) |
|
|
| cv2.rectangle(canvas, (cx, cy), (cx2, cy2), color, -1) |
|
|
| for r in range(gr + 1): |
| ly = int(grid_y0 + r * cell_h) |
| cv2.line(canvas, (x0, ly), (x0 + grid_w, ly), GRID_LINE_COLOR, 1) |
| for c in range(gc + 1): |
| lx = int(x0 + c * cell_w) |
| cv2.line(canvas, (lx, grid_y0), (lx, grid_y0 + grid_h), GRID_LINE_COLOR, 1) |
|
|
| font = cv2.FONT_HERSHEY_SIMPLEX |
| font_scale = 0.8 |
| thickness = 1 |
| (tw, th), _ = cv2.getTextSize(label, font, font_scale, thickness) |
| tx = x0 + (w - tw) // 2 |
| ty = y0 + label_h - 4 |
| cv2.putText(canvas, label, (tx, ty), font, font_scale, LABEL_COLOR, thickness, cv2.LINE_AA) |
|
|
|
|
| |
|
|
| def _compute_layout(task: dict, canvas_h: int, canvas_w: int) -> dict: |
| """Compute uniform grid layout for all pairs on the canvas.""" |
| n_cols = len(task["train"]) + 1 |
| n_rows = 2 |
|
|
| padding = 12 |
| outer_margin = 16 |
| label_h = 20 |
|
|
| usable_w = canvas_w - 2 * outer_margin - (n_cols - 1) * padding |
| usable_h = canvas_h - 2 * outer_margin - (n_rows - 1) * padding |
|
|
| cell_w = usable_w // n_cols |
| cell_h = usable_h // n_rows |
|
|
| total_block_w = cell_w * n_cols + (n_cols - 1) * padding |
| total_block_h = cell_h * n_rows + (n_rows - 1) * padding |
| margin_x = (canvas_w - total_block_w) // 2 |
| margin_y = (canvas_h - total_block_h) // 2 |
|
|
| return { |
| "n_cols": n_cols, "n_rows": n_rows, |
| "cell_w": cell_w, "cell_h": cell_h, |
| "margin_x": margin_x, "margin_y": margin_y, |
| "padding": padding, "label_h": label_h, |
| } |
|
|
|
|
| |
|
|
| def render_frame( |
| task: dict, test_idx: int, rows_revealed: int | None, |
| canvas_h: int = 720, canvas_w: int = 1280, |
| ) -> np.ndarray: |
| """Render one video frame as an RGB numpy array.""" |
| canvas = np.full((canvas_h, canvas_w, 3), BG_COLOR, dtype=np.uint8) |
| layout = _compute_layout(task, canvas_h, canvas_w) |
|
|
| n_cols = layout["n_cols"] |
| cell_w, cell_h = layout["cell_w"], layout["cell_h"] |
| mx, my, pad = layout["margin_x"], layout["margin_y"], layout["padding"] |
|
|
| train_pairs = task["train"] |
| test_pair = task["test"][test_idx] |
|
|
| for col in range(n_cols): |
| x0 = mx + col * (cell_w + pad) |
|
|
| if col < len(train_pairs): |
| inp = np.array(train_pairs[col]["input"]) |
| out = np.array(train_pairs[col]["output"]) |
| _render_grid_to_region(canvas, inp, x0, my, cell_w, cell_h, f"Train {col+1} In") |
| y1 = my + cell_h + pad |
| _render_grid_to_region(canvas, out, x0, y1, cell_w, cell_h, f"Train {col+1} Out") |
| else: |
| test_in = np.array(test_pair["input"]) |
| _render_grid_to_region(canvas, test_in, x0, my, cell_w, cell_h, "Test In") |
| test_out = np.array(test_pair["output"]) |
| y1 = my + cell_h + pad |
| reveal = 0 if rows_revealed is None else rows_revealed |
| _render_grid_to_region(canvas, test_out, x0, y1, cell_w, cell_h, "Test Out", rows_revealed=reveal) |
|
|
| return canvas |
|
|
|
|
| |
|
|
| def generate_video( |
| task: dict, output_path: str, |
| n_frames: int = 5, m_frames: int = 5, k_rate: float = 1.0, |
| max_frames: int | None = None, fps: int = 15, |
| canvas_h: int = 720, canvas_w: int = 1280, |
| ) -> int: |
| """Generate a single ARC task video. Returns total frame count.""" |
| test_out = np.array(task["test"][0]["output"]) |
| total_rows = test_out.shape[0] |
|
|
| reveal_frames_natural = int(math.ceil(total_rows * k_rate)) |
| total_natural = n_frames + reveal_frames_natural + m_frames |
|
|
| if max_frames is not None and total_natural > max_frames: |
| available_reveal = max(1, max_frames - n_frames - m_frames) |
| effective_k = available_reveal / total_rows |
| reveal_frames = available_reveal |
| else: |
| effective_k = k_rate |
| reveal_frames = reveal_frames_natural |
|
|
| total_frames = n_frames + reveal_frames + m_frames |
|
|
| h = canvas_h if canvas_h % 2 == 0 else canvas_h + 1 |
| w = canvas_w if canvas_w % 2 == 0 else canvas_w + 1 |
|
|
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| writer = cv2.VideoWriter(output_path, fourcc, fps, (w, h)) |
|
|
| def _write(frame_rgb: np.ndarray) -> None: |
| writer.write(cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)) |
|
|
| pbar = tqdm(total=total_frames, desc=" Frames", leave=False, unit="f") |
|
|
| |
| placeholder = render_frame(task, 0, None, h, w) |
| for _ in range(n_frames): |
| _write(placeholder) |
| pbar.update(1) |
|
|
| |
| if effective_k >= 1: |
| frames_per_row = effective_k |
| row_cursor = 0 |
| accumulated = 0.0 |
| for _ in range(reveal_frames): |
| accumulated += 1.0 |
| if accumulated >= frames_per_row and row_cursor < total_rows: |
| row_cursor += 1 |
| accumulated -= frames_per_row |
| _write(render_frame(task, 0, row_cursor, h, w)) |
| pbar.update(1) |
| else: |
| rows_per_frame = 1.0 / effective_k |
| row_accum = 0.0 |
| for _ in range(reveal_frames): |
| row_accum += rows_per_frame |
| rows_shown = min(int(math.ceil(row_accum)), total_rows) |
| _write(render_frame(task, 0, rows_shown, h, w)) |
| pbar.update(1) |
|
|
| |
| full = render_frame(task, 0, total_rows, h, w) |
| for _ in range(m_frames): |
| _write(full) |
| pbar.update(1) |
|
|
| pbar.close() |
| writer.release() |
| return total_frames |
|
|
|
|
| |
|
|
| METADATA_FILE = ".metadata.json" |
|
|
|
|
| def _build_params_dict( |
| data_dir: str, n_frames: int, m_frames: int, k_rate: float, |
| max_frames: int | None, fps: int, repeat_num: int, |
| canvas_h: int, canvas_w: int, |
| ) -> dict: |
| """Build a JSON-serializable dict of generation parameters.""" |
| return { |
| "data_dir": str(Path(data_dir).resolve()), |
| "n_frames": n_frames, "m_frames": m_frames, |
| "k_rate": k_rate, "max_frames": max_frames, |
| "fps": fps, "repeat_num": repeat_num, |
| "canvas_h": canvas_h, "canvas_w": canvas_w, |
| } |
|
|
|
|
| def _load_metadata(out_path: Path) -> dict | None: |
| meta_path = out_path / METADATA_FILE |
| if not meta_path.exists(): |
| return None |
| try: |
| with open(meta_path) as f: |
| return json.load(f) |
| except (json.JSONDecodeError, OSError): |
| return None |
|
|
|
|
| def _save_metadata(out_path: Path, params: dict, completed: set[str]) -> None: |
| meta = {"params": params, "completed": sorted(completed)} |
| tmp_path = (out_path / METADATA_FILE).with_suffix(".tmp") |
| with open(tmp_path, "w") as f: |
| json.dump(meta, f, indent=2) |
| tmp_path.replace(out_path / METADATA_FILE) |
|
|
|
|
| def _clear_output_dir(out_path: Path) -> None: |
| if out_path.exists(): |
| for mp4 in out_path.glob("*.mp4"): |
| mp4.unlink() |
| meta = out_path / METADATA_FILE |
| if meta.exists(): |
| meta.unlink() |
|
|
|
|
| def compute_test_output_bbox(task: dict, canvas_h: int, canvas_w: int) -> dict: |
| """Compute the pixel bounding box of the test output cell.""" |
| n_cols = len(task["train"]) + 1 |
| n_rows = 2 |
| padding = 12 |
| outer_margin = 16 |
| label_h = 20 |
|
|
| usable_w = canvas_w - 2 * outer_margin - (n_cols - 1) * padding |
| usable_h = canvas_h - 2 * outer_margin - (n_rows - 1) * padding |
| cell_w = usable_w // n_cols |
| cell_h = usable_h // n_rows |
|
|
| total_block_w = cell_w * n_cols + (n_cols - 1) * padding |
| total_block_h = cell_h * n_rows + (n_rows - 1) * padding |
| margin_x = (canvas_w - total_block_w) // 2 |
| margin_y = (canvas_h - total_block_h) // 2 |
|
|
| col = n_cols - 1 |
| x0 = margin_x + col * (cell_w + padding) |
| y0 = margin_y + cell_h + padding |
|
|
| test_out = np.array(task["test"][0]["output"]) |
| gr, gc = test_out.shape |
|
|
| return { |
| "grid_rows": gr, "grid_cols": gc, |
| "x0": x0, "y0": y0, |
| "grid_x0": x0, "grid_y0": y0 + label_h, |
| "grid_w": cell_w, "grid_h": cell_h - label_h, |
| "cell_w": cell_w, "cell_h": cell_h, |
| } |
|
|
|
|
| def save_video_metadata( |
| task: dict, perm: list[int], seed: int, |
| canvas_h: int, canvas_w: int, meta_path: str, |
| ) -> None: |
| """Save per-video metadata JSON for evaluation.""" |
| bbox = compute_test_output_bbox(task, canvas_h, canvas_w) |
| permuted_palette = ARC_COLORS[perm].tolist() |
|
|
| meta = { |
| "seed": seed, |
| "color_perm": perm, |
| "permuted_palette": permuted_palette, |
| "canvas_h": canvas_h, |
| "canvas_w": canvas_w, |
| **bbox, |
| } |
| Path(meta_path).parent.mkdir(parents=True, exist_ok=True) |
| with open(meta_path, "w") as f: |
| json.dump(meta, f, indent=2) |
|
|
|
|
| |
|
|
| def _write_splits( |
| all_samples: list[dict], |
| out_path: Path, |
| train_ratio: float, |
| ) -> None: |
| """Stratified train/test split by source, write JSONL and CSV files.""" |
| rng = random.Random(42) |
|
|
| by_source: dict[str, list[dict]] = {} |
| for s in all_samples: |
| by_source.setdefault(s["source"], []).append(s) |
|
|
| train_samples, test_samples = [], [] |
| for source in sorted(by_source): |
| group = by_source[source] |
| rng.shuffle(group) |
| split_idx = int(len(group) * train_ratio) |
| train_samples.extend(group[:split_idx]) |
| test_samples.extend(group[split_idx:]) |
|
|
| rng.shuffle(train_samples) |
| rng.shuffle(test_samples) |
|
|
| |
| for name, samples in [("train", train_samples), ("test", test_samples)]: |
| with open(out_path / f"{name}.jsonl", "w") as f: |
| for s in samples: |
| f.write(json.dumps(s) + "\n") |
|
|
| |
| for name, samples in [("train", train_samples), ("test", test_samples)]: |
| with open(out_path / f"{name}.csv", "w", newline="", encoding="utf-8") as f: |
| writer = csv.writer(f) |
| writer.writerow(["video", "meta", "task_id", "source", "prompt"]) |
| for s in samples: |
| writer.writerow([s["video"], s["meta"], s["task_id"], s["source"], s["prompt"]]) |
|
|
| tqdm.write(f" Split: {len(train_samples)} train / {len(test_samples)} test") |
| tqdm.write(f" Written: train.jsonl, test.jsonl, train.csv, test.csv") |
|
|
|
|
| |
|
|
| def process_all( |
| data_dir: str = "data", |
| output_dir: str = "videos", |
| n_frames: int = 5, |
| m_frames: int = 5, |
| k_rate: float = 1.0, |
| max_frames: int | None = None, |
| fps: int = 15, |
| repeat_num: int = 3, |
| canvas_h: int = 720, |
| canvas_w: int = 1280, |
| train_ratio: float = 0.9, |
| prompt: str = "Predict the test output grid based on the input-output training examples.", |
| ) -> None: |
| """Generate videos for all ARC tasks with train/test JSONL splits. |
| |
| Supports resumption via metadata cache. After generation, writes |
| stratified train.jsonl / test.jsonl / CSV files. |
| """ |
| data_path = Path(data_dir) |
| out_path = Path(output_dir) |
| out_path.mkdir(parents=True, exist_ok=True) |
|
|
| current_params = _build_params_dict( |
| data_dir, n_frames, m_frames, k_rate, max_frames, fps, repeat_num, |
| canvas_h, canvas_w, |
| ) |
| existing_meta = _load_metadata(out_path) |
|
|
| if existing_meta is not None and existing_meta.get("params") == current_params: |
| completed: set[str] = { |
| name for name in existing_meta.get("completed", []) |
| if (out_path / name).exists() |
| } |
| tqdm.write(f"Resuming: {len(completed)} videos already completed.") |
| else: |
| if existing_meta is not None: |
| tqdm.write("Parameters changed β clearing and restarting.") |
| _clear_output_dir(out_path) |
| completed = set() |
| _save_metadata(out_path, current_params, completed) |
|
|
| task_files = sorted( |
| list((data_path / "training").glob("*.json")) |
| + list((data_path / "evaluation").glob("*.json")) |
| ) |
| if not task_files: |
| print(f"No task files found in {data_path}/training or {data_path}/evaluation") |
| return |
|
|
| total = len(task_files) * repeat_num |
| pbar = tqdm(total=total, desc="Tasks", unit="vid", initial=len(completed)) |
| save_every = 20 |
| new_since_save = 0 |
| all_samples: list[dict] = [] |
|
|
| for fpath in task_files: |
| task_id = fpath.stem |
| source = fpath.parent.name |
| with open(fpath) as f: |
| task_raw = json.load(f) |
|
|
| if not task_raw.get("test") or "output" not in task_raw["test"][0]: |
| pbar.update(repeat_num) |
| continue |
|
|
| test_out_arr = np.array(task_raw["test"][0]["output"]) |
| grid_rows, grid_cols = test_out_arr.shape |
|
|
| used_perms: set[tuple[int, ...]] = set() |
| seed = 0 |
| generated = 0 |
|
|
| while generated < repeat_num: |
| perm = generate_color_permutation(seed) |
| perm_key = tuple(perm) |
|
|
| if perm_key not in used_perms: |
| used_perms.add(perm_key) |
| video_name = f"{task_id}_{seed}.mp4" |
| meta_name = f"{task_id}_{seed}.meta.json" |
|
|
| sample_meta = { |
| "task_id": task_id, |
| "source": source, |
| "seed": seed, |
| "video": video_name, |
| "meta": meta_name, |
| "prompt": prompt, |
| "grid_rows": int(grid_rows), |
| "grid_cols": int(grid_cols), |
| "color_perm": perm, |
| "n_train_pairs": len(task_raw["train"]), |
| } |
|
|
| if video_name not in completed: |
| permuted_task = permute_task(task_raw, perm) |
| pbar.set_postfix_str(f"{task_id}_{seed}") |
| video_file = str(out_path / video_name) |
|
|
| frame_count = generate_video( |
| permuted_task, video_file, |
| n_frames=n_frames, m_frames=m_frames, k_rate=k_rate, |
| max_frames=max_frames, fps=fps, |
| canvas_h=canvas_h, canvas_w=canvas_w, |
| ) |
| sample_meta["frame_count"] = frame_count |
|
|
| meta_file = video_file.replace(".mp4", ".meta.json") |
| save_video_metadata( |
| task=permuted_task, perm=perm, seed=seed, |
| canvas_h=canvas_h, canvas_w=canvas_w, meta_path=meta_file, |
| ) |
|
|
| completed.add(video_name) |
| pbar.update(1) |
| new_since_save += 1 |
|
|
| if new_since_save >= save_every: |
| _save_metadata(out_path, current_params, completed) |
| new_since_save = 0 |
|
|
| all_samples.append(sample_meta) |
| generated += 1 |
|
|
| seed += 1 |
| if seed > repeat_num + 1000: |
| tqdm.write(f"Warning: could not generate {repeat_num} unique perms for {task_id}") |
| pbar.update(repeat_num - generated) |
| break |
|
|
| pbar.close() |
| _save_metadata(out_path, current_params, completed) |
|
|
| |
| _write_splits(all_samples, out_path, train_ratio) |
|
|
| tqdm.write(f"Done. {len(completed)} videos, {len(all_samples)} samples in {out_path}/") |
|
|
|
|
| |
|
|
| def parse_args() -> argparse.Namespace: |
| p = argparse.ArgumentParser(description="ARC-AGI-2 Video Generator") |
| p.add_argument("--data_dir", type=str, default="ARC-AGI-2/data") |
| p.add_argument("--output_dir", type=str, default="videos") |
| p.add_argument("--n_frames", type=int, default=5) |
| p.add_argument("--m_frames", type=int, default=5) |
| p.add_argument("--k_rate", type=float, default=1.0) |
| p.add_argument("--max_frames", type=int, default=None) |
| p.add_argument("--fps", type=int, default=15) |
| p.add_argument("--repeat_num", type=int, default=3) |
| p.add_argument("--resolution", type=int, nargs=2, default=[720, 1280], |
| metavar=("H", "W")) |
| p.add_argument("--train_ratio", type=float, default=0.9, |
| help="Train split ratio (default: 0.9)") |
| p.add_argument("--prompt", type=str, |
| default="Predict the test output grid based on the input-output training examples.") |
| return p.parse_args() |
|
|
|
|
| if __name__ == "__main__": |
| args = parse_args() |
| process_all( |
| data_dir=args.data_dir, |
| output_dir=args.output_dir, |
| n_frames=args.n_frames, |
| m_frames=args.m_frames, |
| k_rate=args.k_rate, |
| max_frames=args.max_frames, |
| fps=args.fps, |
| repeat_num=args.repeat_num, |
| canvas_h=args.resolution[0], |
| canvas_w=args.resolution[1], |
| train_ratio=args.train_ratio, |
| prompt=args.prompt, |
| ) |