microfactory-lab / scripts /assemble-video.py
kylebrodeur's picture
space: clean sync from chief-engineer root incl. screenshots and latest README
d7337a8 verified
Raw
History Blame Contribute Delete
7.45 kB
#!/usr/bin/env python3
"""Assemble the final Microfactory demo video from short beat clips + VO.
No Premiere/Cap Studio edit session required. Each beat is exported from Cap
and then combined here with:
- voice-over audio per beat
- burned-in captions in the bottom safe zone
- optional camera open/close clips
- all inputs scaled/padded to a consistent output resolution
Usage:
uv run python scripts/assemble-video.py recordings/manifest.json
Output:
recordings/output/microfactory-node-demo.mp4
Manifest format (recordings/manifest.example.json is a starter):
{
"output_resolution": [1707, 1067],
"output_fps": 60,
"font_file": "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"segments": [
{"type": "camera", "video": "camera/open.mp4"},
{"type": "screen", "video": "beats/load.mp4", "audio": "vo/load.wav",
"caption": "I give it the part, the material, and the room..."},
...
{"type": "camera", "video": "camera/close.mp4"}
]
}
"""
from __future__ import annotations
import json
import os
import re
import shutil
import subprocess
import sys
import textwrap
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List, Optional
def run(cmd: List[str], **kwargs) -> None:
print("$ " + " ".join(str(c) for c in cmd))
subprocess.run(cmd, check=True, **kwargs)
def duration(path: str) -> float:
out = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", path],
capture_output=True, text=True, check=True,
)
return float(out.stdout.strip())
def has_audio(path: str) -> bool:
try:
out = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "a",
"-show_entries", "stream=codec_type", "-of",
"default=noprint_wrappers=1:nokey=1", path],
capture_output=True, text=True, check=True,
)
return bool(out.stdout.strip())
except subprocess.CalledProcessError:
return False
def wrap_caption(text: str, width: int = 60) -> str:
lines = textwrap.wrap(text, width=width) or [text]
return "\\n".join(lines)
def build_segment(
video_path: str,
audio_path: Optional[str],
caption: Optional[str],
output_path: str,
width: int,
height: int,
fps: int,
font_file: str,
tmp_dir: Optional[str] = None,
) -> None:
target = duration(audio_path) if audio_path else duration(video_path)
video_dur = duration(video_path)
# Video filter chain
vfilters: List[str] = [
f"scale={width}:{height}:force_original_aspect_ratio=decrease",
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2",
"setsar=1",
]
if target > video_dur + 0.05:
pad_dur = target - video_dur
vfilters.append(f"tpad=stop_mode=clone:stop_duration={pad_dur:.3f}")
elif target < video_dur - 0.05:
vfilters.append(f"trim=0:{target:.3f},setpts=PTS-STARTPTS")
vfilters.append(f"fps={fps}")
if caption:
caption_file: str
if tmp_dir:
caption_file = os.path.join(tmp_dir, f"caption_{os.path.basename(output_path)}.txt")
else:
import tempfile
caption_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False).name
with open(caption_file, 'w', encoding='utf-8') as f:
f.write(wrap_caption(caption))
vfilters.append(
f"drawtext=fontfile={font_file}:"
f"textfile={caption_file}:"
f"fontcolor=white:fontsize=28:box=1:boxcolor=black@0.65:boxborderw=10:"
f"x=(w-text_w)/2:y=h-text_h-40"
)
# Audio filter chain
inputs = ["-i", video_path]
if audio_path:
inputs += ["-i", audio_path]
audio_filter = (
f"[1:a]aloop=loop=-1:size=10000000,"
f"atrim=0:{target:.3f},asetpts=PTS-STARTPTS[aout]"
)
elif has_audio(video_path):
audio_filter = (
f"[0:a]aloop=loop=-1:size=10000000,"
f"atrim=0:{target:.3f},asetpts=PTS-STARTPTS[aout]"
)
else:
audio_filter = (
f"anullsrc=channel_layout=stereo:sample_rate=48000:"
f"duration={target:.3f}[aout]"
)
filter_complex = ";".join([f"[0:v]{','.join(vfilters)}[vout]", audio_filter])
cmd = [
"ffmpeg", "-y", *inputs,
"-filter_complex", filter_complex,
"-map", "[vout]", "-map", "[aout]",
"-c:v", "libx264", "-preset", "fast", "-crf", "18",
"-c:a", "aac", "-b:a", "192k",
"-r", str(fps), "-t", f"{target:.3f}",
output_path,
]
run(cmd)
def assemble(segments: List[dict], output_path: str, width: int, height: int, fps: int) -> None:
# H.264/HEVC encoders generally require even dimensions.
if width % 2 or height % 2:
print(f"Warning: output resolution {width}x{height} has odd dimensions; rounding up to even.")
width = width + (width % 2)
height = height + (height % 2)
print(f"Using {width}x{height} for encoding.")
with TemporaryDirectory(prefix="assemble_") as tmp:
seg_files: List[str] = []
for idx, seg in enumerate(segments, 1):
seg_out = os.path.join(tmp, f"seg_{idx:03d}.mp4")
build_segment(
video_path=seg["video"],
audio_path=seg.get("audio"),
caption=seg.get("caption"),
output_path=seg_out,
width=width,
height=height,
fps=fps,
font_file=seg.get("font_file", "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"),
tmp_dir=tmp,
)
seg_files.append(seg_out)
list_file = os.path.join(tmp, "concat.txt")
with open(list_file, "w") as f:
for path in seg_files:
f.write(f"file '{path}'\n")
run([
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file,
"-c:v", "libx264", "-preset", "fast", "-crf", "18",
"-c:a", "aac", "-b:a", "192k",
output_path,
])
def main() -> None:
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <manifest.json>", file=sys.stderr)
sys.exit(1)
manifest_path = Path(sys.argv[1])
with open(manifest_path) as f:
manifest = json.load(f)
base_dir = manifest_path.parent
os.chdir(base_dir)
width, height = manifest.get("output_resolution", [1708, 1068])
fps = manifest.get("output_fps", 60)
font_file = manifest.get(
"font_file", "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
)
segments: List[dict] = []
for seg in manifest.get("segments", []):
resolved = {
"video": str(Path(seg["video"]).resolve()),
"audio": str(Path(seg["audio"]).resolve()) if seg.get("audio") else None,
"caption": seg.get("caption"),
"font_file": font_file,
}
segments.append(resolved)
output = manifest.get("output", "output/microfactory-node-demo.mp4")
output_path = Path(output).resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
assemble(segments, str(output_path), width, height, fps)
print(f"\n✓ Final video: {output_path}")
if __name__ == "__main__":
main()