Visual-Reasoning / ARC /video_generate.py
Jayce-Ping's picture
Add files using upload-large-folder tool
460dc79 verified
"""ARC-AGI-2 Task Video Generator.
Generates animated videos for ARC tasks that progressively reveal test outputs.
Supports random color permutation for data augmentation.
Renders directly to a target resolution with auto-calculated grid layout.
Outputs train.jsonl / test.jsonl with stratified splits.
Usage:
python video_generate.py --data_dir data --output_dir videos \
--n_frames 5 --m_frames 5 --k_rate 1.0 \
--repeat_num 3 --max_frames None --fps 15 \
--resolution 720 1280 --train_ratio 0.9
"""
import json
import csv
import argparse
import random
import math
from pathlib import Path
from tqdm import tqdm
import cv2
import numpy as np
# ── ARC Color Palette (RGB) ───────────────────────────────────────────────────
ARC_COLORS = np.array([
[0x00, 0x00, 0x00], # 0: black
[0x00, 0x74, 0xD9], # 1: blue
[0xFF, 0x41, 0x36], # 2: red
[0x2E, 0xCC, 0x40], # 3: green
[0xFF, 0xDC, 0x00], # 4: yellow
[0xAA, 0xAA, 0xAA], # 5: grey
[0xF0, 0x12, 0xBE], # 6: magenta
[0xFF, 0x85, 0x1B], # 7: orange
[0x7F, 0xDB, 0xFF], # 8: light blue
[0x87, 0x0C, 0x25], # 9: maroon
], dtype=np.uint8)
GRID_LINE_COLOR = (200, 200, 200)
LABEL_COLOR = (40, 40, 40)
BG_COLOR = (255, 255, 255)
UNREVEALED_COLOR = np.array([220, 220, 220], dtype=np.uint8)
# ── Color Permutation ──────────────────────────────────────────────────────────
def generate_color_permutation(seed: int) -> list[int]:
"""Generate a deterministic color permutation from a seed."""
rng = random.Random(seed)
perm = list(range(10))
rng.shuffle(perm)
return perm
def apply_color_permutation(grid: list[list[int]], perm: list[int]) -> list[list[int]]:
"""Apply color permutation to a grid (nested list)."""
return [[perm[cell] for cell in row] for row in grid]
def permute_task(task: dict, perm: list[int]) -> dict:
"""Return a deep-copied task with all grids color-permuted."""
new_task = {"train": [], "test": []}
for pair in task["train"]:
new_task["train"].append({
"input": apply_color_permutation(pair["input"], perm),
"output": apply_color_permutation(pair["output"], perm),
})
for pair in task["test"]:
new_pair = {"input": apply_color_permutation(pair["input"], perm)}
if "output" in pair:
new_pair["output"] = apply_color_permutation(pair["output"], perm)
new_task["test"].append(new_pair)
return new_task
# ── Direct Canvas Grid Rendering ───────────────────────────────────────────────
def _render_grid_to_region(
canvas: np.ndarray,
grid: np.ndarray,
x0: int, y0: int, w: int, h: int,
label: str,
rows_revealed: int | None = None,
) -> None:
"""Render a single ARC grid into a rectangular region of the canvas."""
label_h = 20
grid_y0 = y0 + label_h
grid_h = h - label_h
grid_w = w
if grid_h <= 0 or grid_w <= 0:
return
gr, gc = grid.shape
cell_h = grid_h / gr
cell_w = grid_w / gc
for r in range(gr):
for c in range(gc):
cy = int(grid_y0 + r * cell_h)
cx = int(x0 + c * cell_w)
cy2 = int(grid_y0 + (r + 1) * cell_h)
cx2 = int(x0 + (c + 1) * cell_w)
if rows_revealed is not None and r >= rows_revealed:
color = tuple(UNREVEALED_COLOR.tolist())
else:
color = tuple(ARC_COLORS[grid[r, c]].tolist())
cv2.rectangle(canvas, (cx, cy), (cx2, cy2), color, -1)
for r in range(gr + 1):
ly = int(grid_y0 + r * cell_h)
cv2.line(canvas, (x0, ly), (x0 + grid_w, ly), GRID_LINE_COLOR, 1)
for c in range(gc + 1):
lx = int(x0 + c * cell_w)
cv2.line(canvas, (lx, grid_y0), (lx, grid_y0 + grid_h), GRID_LINE_COLOR, 1)
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.8
thickness = 1
(tw, th), _ = cv2.getTextSize(label, font, font_scale, thickness)
tx = x0 + (w - tw) // 2
ty = y0 + label_h - 4
cv2.putText(canvas, label, (tx, ty), font, font_scale, LABEL_COLOR, thickness, cv2.LINE_AA)
# ── Layout Calculation ─────────────────────────────────────────────────────────
def _compute_layout(task: dict, canvas_h: int, canvas_w: int) -> dict:
"""Compute uniform grid layout for all pairs on the canvas."""
n_cols = len(task["train"]) + 1
n_rows = 2
padding = 12
outer_margin = 16
label_h = 20
usable_w = canvas_w - 2 * outer_margin - (n_cols - 1) * padding
usable_h = canvas_h - 2 * outer_margin - (n_rows - 1) * padding
cell_w = usable_w // n_cols
cell_h = usable_h // n_rows
total_block_w = cell_w * n_cols + (n_cols - 1) * padding
total_block_h = cell_h * n_rows + (n_rows - 1) * padding
margin_x = (canvas_w - total_block_w) // 2
margin_y = (canvas_h - total_block_h) // 2
return {
"n_cols": n_cols, "n_rows": n_rows,
"cell_w": cell_w, "cell_h": cell_h,
"margin_x": margin_x, "margin_y": margin_y,
"padding": padding, "label_h": label_h,
}
# ── Frame Rendering ────────────────────────────────────────────────────────────
def render_frame(
task: dict, test_idx: int, rows_revealed: int | None,
canvas_h: int = 720, canvas_w: int = 1280,
) -> np.ndarray:
"""Render one video frame as an RGB numpy array."""
canvas = np.full((canvas_h, canvas_w, 3), BG_COLOR, dtype=np.uint8)
layout = _compute_layout(task, canvas_h, canvas_w)
n_cols = layout["n_cols"]
cell_w, cell_h = layout["cell_w"], layout["cell_h"]
mx, my, pad = layout["margin_x"], layout["margin_y"], layout["padding"]
train_pairs = task["train"]
test_pair = task["test"][test_idx]
for col in range(n_cols):
x0 = mx + col * (cell_w + pad)
if col < len(train_pairs):
inp = np.array(train_pairs[col]["input"])
out = np.array(train_pairs[col]["output"])
_render_grid_to_region(canvas, inp, x0, my, cell_w, cell_h, f"Train {col+1} In")
y1 = my + cell_h + pad
_render_grid_to_region(canvas, out, x0, y1, cell_w, cell_h, f"Train {col+1} Out")
else:
test_in = np.array(test_pair["input"])
_render_grid_to_region(canvas, test_in, x0, my, cell_w, cell_h, "Test In")
test_out = np.array(test_pair["output"])
y1 = my + cell_h + pad
reveal = 0 if rows_revealed is None else rows_revealed
_render_grid_to_region(canvas, test_out, x0, y1, cell_w, cell_h, "Test Out", rows_revealed=reveal)
return canvas
# ── Video Generation ───────────────────────────────────────────────────────────
def generate_video(
task: dict, output_path: str,
n_frames: int = 5, m_frames: int = 5, k_rate: float = 1.0,
max_frames: int | None = None, fps: int = 15,
canvas_h: int = 720, canvas_w: int = 1280,
) -> int:
"""Generate a single ARC task video. Returns total frame count."""
test_out = np.array(task["test"][0]["output"])
total_rows = test_out.shape[0]
reveal_frames_natural = int(math.ceil(total_rows * k_rate))
total_natural = n_frames + reveal_frames_natural + m_frames
if max_frames is not None and total_natural > max_frames:
available_reveal = max(1, max_frames - n_frames - m_frames)
effective_k = available_reveal / total_rows
reveal_frames = available_reveal
else:
effective_k = k_rate
reveal_frames = reveal_frames_natural
total_frames = n_frames + reveal_frames + m_frames
h = canvas_h if canvas_h % 2 == 0 else canvas_h + 1
w = canvas_w if canvas_w % 2 == 0 else canvas_w + 1
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(output_path, fourcc, fps, (w, h))
def _write(frame_rgb: np.ndarray) -> None:
writer.write(cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR))
pbar = tqdm(total=total_frames, desc=" Frames", leave=False, unit="f")
# Phase 1: Placeholder
placeholder = render_frame(task, 0, None, h, w)
for _ in range(n_frames):
_write(placeholder)
pbar.update(1)
# Phase 2: Progressive reveal
if effective_k >= 1:
frames_per_row = effective_k
row_cursor = 0
accumulated = 0.0
for _ in range(reveal_frames):
accumulated += 1.0
if accumulated >= frames_per_row and row_cursor < total_rows:
row_cursor += 1
accumulated -= frames_per_row
_write(render_frame(task, 0, row_cursor, h, w))
pbar.update(1)
else:
rows_per_frame = 1.0 / effective_k
row_accum = 0.0
for _ in range(reveal_frames):
row_accum += rows_per_frame
rows_shown = min(int(math.ceil(row_accum)), total_rows)
_write(render_frame(task, 0, rows_shown, h, w))
pbar.update(1)
# Phase 3: Full answer
full = render_frame(task, 0, total_rows, h, w)
for _ in range(m_frames):
_write(full)
pbar.update(1)
pbar.close()
writer.release()
return total_frames
# ── Metadata Cache ─────────────────────────────────────────────────────────────
METADATA_FILE = ".metadata.json"
def _build_params_dict(
data_dir: str, n_frames: int, m_frames: int, k_rate: float,
max_frames: int | None, fps: int, repeat_num: int,
canvas_h: int, canvas_w: int,
) -> dict:
"""Build a JSON-serializable dict of generation parameters."""
return {
"data_dir": str(Path(data_dir).resolve()),
"n_frames": n_frames, "m_frames": m_frames,
"k_rate": k_rate, "max_frames": max_frames,
"fps": fps, "repeat_num": repeat_num,
"canvas_h": canvas_h, "canvas_w": canvas_w,
}
def _load_metadata(out_path: Path) -> dict | None:
meta_path = out_path / METADATA_FILE
if not meta_path.exists():
return None
try:
with open(meta_path) as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def _save_metadata(out_path: Path, params: dict, completed: set[str]) -> None:
meta = {"params": params, "completed": sorted(completed)}
tmp_path = (out_path / METADATA_FILE).with_suffix(".tmp")
with open(tmp_path, "w") as f:
json.dump(meta, f, indent=2)
tmp_path.replace(out_path / METADATA_FILE)
def _clear_output_dir(out_path: Path) -> None:
if out_path.exists():
for mp4 in out_path.glob("*.mp4"):
mp4.unlink()
meta = out_path / METADATA_FILE
if meta.exists():
meta.unlink()
def compute_test_output_bbox(task: dict, canvas_h: int, canvas_w: int) -> dict:
"""Compute the pixel bounding box of the test output cell."""
n_cols = len(task["train"]) + 1
n_rows = 2
padding = 12
outer_margin = 16
label_h = 20
usable_w = canvas_w - 2 * outer_margin - (n_cols - 1) * padding
usable_h = canvas_h - 2 * outer_margin - (n_rows - 1) * padding
cell_w = usable_w // n_cols
cell_h = usable_h // n_rows
total_block_w = cell_w * n_cols + (n_cols - 1) * padding
total_block_h = cell_h * n_rows + (n_rows - 1) * padding
margin_x = (canvas_w - total_block_w) // 2
margin_y = (canvas_h - total_block_h) // 2
col = n_cols - 1
x0 = margin_x + col * (cell_w + padding)
y0 = margin_y + cell_h + padding
test_out = np.array(task["test"][0]["output"])
gr, gc = test_out.shape
return {
"grid_rows": gr, "grid_cols": gc,
"x0": x0, "y0": y0,
"grid_x0": x0, "grid_y0": y0 + label_h,
"grid_w": cell_w, "grid_h": cell_h - label_h,
"cell_w": cell_w, "cell_h": cell_h,
}
def save_video_metadata(
task: dict, perm: list[int], seed: int,
canvas_h: int, canvas_w: int, meta_path: str,
) -> None:
"""Save per-video metadata JSON for evaluation."""
bbox = compute_test_output_bbox(task, canvas_h, canvas_w)
permuted_palette = ARC_COLORS[perm].tolist()
meta = {
"seed": seed,
"color_perm": perm,
"permuted_palette": permuted_palette,
"canvas_h": canvas_h,
"canvas_w": canvas_w,
**bbox,
}
Path(meta_path).parent.mkdir(parents=True, exist_ok=True)
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
# ── Train/Test Split ───────────────────────────────────────────────────────────
def _write_splits(
all_samples: list[dict],
out_path: Path,
train_ratio: float,
) -> None:
"""Stratified train/test split by source, write JSONL and CSV files."""
rng = random.Random(42)
by_source: dict[str, list[dict]] = {}
for s in all_samples:
by_source.setdefault(s["source"], []).append(s)
train_samples, test_samples = [], []
for source in sorted(by_source):
group = by_source[source]
rng.shuffle(group)
split_idx = int(len(group) * train_ratio)
train_samples.extend(group[:split_idx])
test_samples.extend(group[split_idx:])
rng.shuffle(train_samples)
rng.shuffle(test_samples)
# JSONL
for name, samples in [("train", train_samples), ("test", test_samples)]:
with open(out_path / f"{name}.jsonl", "w") as f:
for s in samples:
f.write(json.dumps(s) + "\n")
# CSV
for name, samples in [("train", train_samples), ("test", test_samples)]:
with open(out_path / f"{name}.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["video", "meta", "task_id", "source", "prompt"])
for s in samples:
writer.writerow([s["video"], s["meta"], s["task_id"], s["source"], s["prompt"]])
tqdm.write(f" Split: {len(train_samples)} train / {len(test_samples)} test")
tqdm.write(f" Written: train.jsonl, test.jsonl, train.csv, test.csv")
# ── Batch Processing ───────────────────────────────────────────────────────────
def process_all(
data_dir: str = "data",
output_dir: str = "videos",
n_frames: int = 5,
m_frames: int = 5,
k_rate: float = 1.0,
max_frames: int | None = None,
fps: int = 15,
repeat_num: int = 3,
canvas_h: int = 720,
canvas_w: int = 1280,
train_ratio: float = 0.9,
prompt: str = "Predict the test output grid based on the input-output training examples.",
) -> None:
"""Generate videos for all ARC tasks with train/test JSONL splits.
Supports resumption via metadata cache. After generation, writes
stratified train.jsonl / test.jsonl / CSV files.
"""
data_path = Path(data_dir)
out_path = Path(output_dir)
out_path.mkdir(parents=True, exist_ok=True)
current_params = _build_params_dict(
data_dir, n_frames, m_frames, k_rate, max_frames, fps, repeat_num,
canvas_h, canvas_w,
)
existing_meta = _load_metadata(out_path)
if existing_meta is not None and existing_meta.get("params") == current_params:
completed: set[str] = {
name for name in existing_meta.get("completed", [])
if (out_path / name).exists()
}
tqdm.write(f"Resuming: {len(completed)} videos already completed.")
else:
if existing_meta is not None:
tqdm.write("Parameters changed β€” clearing and restarting.")
_clear_output_dir(out_path)
completed = set()
_save_metadata(out_path, current_params, completed)
task_files = sorted(
list((data_path / "training").glob("*.json"))
+ list((data_path / "evaluation").glob("*.json"))
)
if not task_files:
print(f"No task files found in {data_path}/training or {data_path}/evaluation")
return
total = len(task_files) * repeat_num
pbar = tqdm(total=total, desc="Tasks", unit="vid", initial=len(completed))
save_every = 20
new_since_save = 0
all_samples: list[dict] = []
for fpath in task_files:
task_id = fpath.stem
source = fpath.parent.name # "training" or "evaluation"
with open(fpath) as f:
task_raw = json.load(f)
if not task_raw.get("test") or "output" not in task_raw["test"][0]:
pbar.update(repeat_num)
continue
test_out_arr = np.array(task_raw["test"][0]["output"])
grid_rows, grid_cols = test_out_arr.shape
used_perms: set[tuple[int, ...]] = set()
seed = 0
generated = 0
while generated < repeat_num:
perm = generate_color_permutation(seed)
perm_key = tuple(perm)
if perm_key not in used_perms:
used_perms.add(perm_key)
video_name = f"{task_id}_{seed}.mp4"
meta_name = f"{task_id}_{seed}.meta.json"
sample_meta = {
"task_id": task_id,
"source": source,
"seed": seed,
"video": video_name,
"meta": meta_name,
"prompt": prompt,
"grid_rows": int(grid_rows),
"grid_cols": int(grid_cols),
"color_perm": perm,
"n_train_pairs": len(task_raw["train"]),
}
if video_name not in completed:
permuted_task = permute_task(task_raw, perm)
pbar.set_postfix_str(f"{task_id}_{seed}")
video_file = str(out_path / video_name)
frame_count = generate_video(
permuted_task, video_file,
n_frames=n_frames, m_frames=m_frames, k_rate=k_rate,
max_frames=max_frames, fps=fps,
canvas_h=canvas_h, canvas_w=canvas_w,
)
sample_meta["frame_count"] = frame_count
meta_file = video_file.replace(".mp4", ".meta.json")
save_video_metadata(
task=permuted_task, perm=perm, seed=seed,
canvas_h=canvas_h, canvas_w=canvas_w, meta_path=meta_file,
)
completed.add(video_name)
pbar.update(1)
new_since_save += 1
if new_since_save >= save_every:
_save_metadata(out_path, current_params, completed)
new_since_save = 0
all_samples.append(sample_meta)
generated += 1
seed += 1
if seed > repeat_num + 1000:
tqdm.write(f"Warning: could not generate {repeat_num} unique perms for {task_id}")
pbar.update(repeat_num - generated)
break
pbar.close()
_save_metadata(out_path, current_params, completed)
# Write train/test splits
_write_splits(all_samples, out_path, train_ratio)
tqdm.write(f"Done. {len(completed)} videos, {len(all_samples)} samples in {out_path}/")
# ── CLI ────────────────────────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="ARC-AGI-2 Video Generator")
p.add_argument("--data_dir", type=str, default="ARC-AGI-2/data")
p.add_argument("--output_dir", type=str, default="videos")
p.add_argument("--n_frames", type=int, default=5)
p.add_argument("--m_frames", type=int, default=5)
p.add_argument("--k_rate", type=float, default=1.0)
p.add_argument("--max_frames", type=int, default=None)
p.add_argument("--fps", type=int, default=15)
p.add_argument("--repeat_num", type=int, default=3)
p.add_argument("--resolution", type=int, nargs=2, default=[720, 1280],
metavar=("H", "W"))
p.add_argument("--train_ratio", type=float, default=0.9,
help="Train split ratio (default: 0.9)")
p.add_argument("--prompt", type=str,
default="Predict the test output grid based on the input-output training examples.")
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
process_all(
data_dir=args.data_dir,
output_dir=args.output_dir,
n_frames=args.n_frames,
m_frames=args.m_frames,
k_rate=args.k_rate,
max_frames=args.max_frames,
fps=args.fps,
repeat_num=args.repeat_num,
canvas_h=args.resolution[0],
canvas_w=args.resolution[1],
train_ratio=args.train_ratio,
prompt=args.prompt,
)