SyncAI / src /assembler.py
ICGenAIShare04's picture
Upload 52 files
72f552e verified
"""FFmpeg video stitching, clip splitting/shuffling, lyrics overlay.
Takes generated video clips (one per 4-beat segment), splits each into
two halves, shuffles them with a distance constraint, builds a timeline
with dynamic pacing (4-beat cuts before the drop, 2-beat after), overlays
audio and lyrics text.
"""
import json
import random
import subprocess
import tempfile
from pathlib import Path
def _get_audio_path(run_dir: Path) -> Path:
"""Find the original audio file one level above the run directory."""
song_dir = run_dir.parent
for ext in [".wav", ".mp3", ".flac", ".m4a"]:
candidates = list(song_dir.glob(f"*{ext}"))
if candidates:
return candidates[0]
raise FileNotFoundError(f"No audio file found in {song_dir}")
def _get_clip_duration(clip_path: Path) -> float:
"""Get video duration in seconds using ffprobe."""
result = subprocess.run([
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "csv=p=0",
str(clip_path),
], capture_output=True, text=True, check=True)
return float(result.stdout.strip())
def _get_clip_fps(clip_path: Path) -> float:
"""Get video frame rate using ffprobe."""
result = subprocess.run([
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=r_frame_rate",
"-of", "csv=p=0",
str(clip_path),
], capture_output=True, text=True, check=True)
num, den = result.stdout.strip().split("/")
return int(num) / int(den)
def _trim_clip(clip_path: Path, start: float, duration: float, output_path: Path):
"""Trim a video clip from a start point to a duration using FFmpeg."""
cmd = [
"ffmpeg", "-y",
"-ss", f"{start:.3f}",
"-i", str(clip_path),
"-t", f"{duration:.3f}",
"-c:v", "libx264", "-preset", "fast",
"-an",
str(output_path),
]
subprocess.run(cmd, check=True, capture_output=True)
# ---------------------------------------------------------------------------
# Ken Burns effects — subtle pan/zoom applied per slot for added motion
# ---------------------------------------------------------------------------
# Zoom factor: 8% total movement over the clip duration
_KB_ZOOM = 0.45
KEN_BURNS_EFFECTS = [
"zoom_in",
"zoom_out",
]
def _ken_burns_filter(
effect: str, n_frames: int, width: int, height: int,
) -> str:
"""Build an FFmpeg filter for a smooth Ken Burns zoom effect on video.
Upscales the video 4x before applying zoompan with d=1 (one output
frame per input frame), then scales back to original size. The 4x
upscale makes integer rounding in zoompan negligible, eliminating
visible jitter.
"""
z = _KB_ZOOM
N = max(n_frames, 1)
W, H = width, height
# Upscale factor — higher = smoother but slower
UP = 8
UW, UH = W * UP, H * UP
if effect == "zoom_in":
zoom_expr = f"1+{z}*on/{N}"
elif effect == "zoom_out":
zoom_expr = f"1+{z}-{z}*on/{N}"
else:
return f"scale={W}:{H}"
return (
f"scale={UW}:{UH}:flags=lanczos,"
f"zoompan=z='{zoom_expr}':"
f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)':"
f"d=1:s={UW}x{UH},"
f"scale={W}:{H}:flags=lanczos"
)
def _get_clip_dimensions(clip_path: Path) -> tuple[int, int]:
"""Get width and height of a video clip."""
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=width,height",
"-of", "csv=s=x:p=0", str(clip_path)],
capture_output=True, text=True, check=True,
)
w, h = result.stdout.strip().split("x")
return int(w), int(h)
def _split_clip(clip_path: Path, clip_id: int) -> dict:
"""Register a clip's two halves without pre-splitting.
The "first" half plays from the start, the "second" half plays from
the end (offset back by the slot duration at trim time). This makes
the two halves maximally different — no fixed midpoint split.
Returns dict with the original path and full duration for each half.
"""
duration = _get_clip_duration(clip_path)
return {
"clip_id": clip_id,
"first": clip_path,
"second": clip_path,
"first_duration": duration,
"second_duration": duration,
}
def _build_sub_segments(segments: list[dict], drop_time: float | None) -> list[dict]:
"""Build the final timeline of sub-segments.
Before the drop: one slot per 4-beat segment.
After the drop: each 4-beat segment splits into two 2-beat slots
using the beat timestamps stored in the segment.
"""
sub_segments = []
for seg in segments:
beats = seg.get("beats", [seg["start"], seg["end"]])
is_after_drop = drop_time is not None and seg["start"] >= drop_time
if is_after_drop and len(beats) >= 3:
# Split at midpoint beat (beat 2 of 4)
mid_idx = len(beats) // 2
mid_time = beats[mid_idx]
sub_segments.append({
"start": seg["start"],
"end": mid_time,
"duration": round(mid_time - seg["start"], 3),
"lyrics": seg.get("lyrics", ""),
"parent_segment": seg["segment"],
})
sub_segments.append({
"start": mid_time,
"end": seg["end"],
"duration": round(seg["end"] - mid_time, 3),
"lyrics": "", # lyrics stay on the first half
"parent_segment": seg["segment"],
})
else:
# Before drop: one slot for the full 4-beat segment
sub_segments.append({
"start": seg["start"],
"end": seg["end"],
"duration": seg["duration"],
"lyrics": seg.get("lyrics", ""),
"parent_segment": seg["segment"],
})
return sub_segments
def _shuffle_with_distance(pool: list[tuple], n_slots: int) -> list[tuple]:
"""Select n_slots sub-clips maximising clip diversity and spacing.
Shuffles clip IDs once, then repeats that order to fill all slots.
First pass uses "first" halves, second pass uses "second" halves.
Same clip is always exactly n_clips positions apart — maximum spacing.
Each item is (clip_id, half_label, path, duration).
"""
by_clip: dict[int, list[tuple]] = {}
for item in pool:
by_clip.setdefault(item[0], []).append(item)
clip_ids = list(by_clip.keys())
random.shuffle(clip_ids)
# Repeat the shuffled order: [4,5,1,2,6,3, 4,5,1,2,6,3, ...]
result = []
cycle = 0
while len(result) < n_slots:
for cid in clip_ids:
if len(result) >= n_slots:
break
halves = by_clip[cid]
# First cycle uses "first" half, second cycle uses "second", etc.
half_idx = cycle % len(halves)
result.append(halves[half_idx])
cycle += 1
return result
# Font registry — maps display names to .ttf filenames in fonts/
FONTS = {
"Bebas Neue": "BebasNeue-Regular.ttf",
"Teko": "Teko-Bold.ttf",
"Russo One": "RussoOne-Regular.ttf",
"Staatliches": "Staatliches-Regular.ttf",
}
DEFAULT_FONT = "Bebas Neue"
DEFAULT_FONT_COLOR = "#FFF7D4"
_FONTS_DIR = Path(__file__).resolve().parent.parent / "fonts"
def font_names() -> list[str]:
"""Return list of available font display names."""
return list(FONTS.keys())
def _get_font_path(font_name: str) -> Path:
"""Resolve a font display name to its .ttf file path."""
filename = FONTS.get(font_name, FONTS[DEFAULT_FONT])
return _FONTS_DIR / filename
_SPOTIFY_BADGE = Path(__file__).resolve().parent.parent / "assets" / "spotify_badge.png"
def _add_lyrics_overlay(
video_path: Path,
segments: list[dict],
output_path: Path,
audio_offset: float,
font_name: str = DEFAULT_FONT,
font_color: str = DEFAULT_FONT_COLOR,
cover_art: Path | None = None,
drop_time: float | None = None,
song_name: str = "",
):
"""Add lyrics text and optional cover art overlay using FFmpeg filters."""
font_path = _get_font_path(font_name)
# If cover art provided, lyrics stop at the drop
lyrics_cutoff = None
if cover_art is not None and drop_time is not None:
lyrics_cutoff = drop_time
# Collect all words with timestamps
all_words = []
for seg in segments:
for word_info in seg.get("words", []):
word = word_info["word"].strip().lower()
if not word:
continue
w_start = word_info["start"]
w_end = word_info["end"]
# Skip words that start after the cutoff
if lyrics_cutoff is not None and w_start >= lyrics_cutoff:
continue
# Clamp end to cutoff for words that span the drop
if lyrics_cutoff is not None and w_end > lyrics_cutoff:
w_end = lyrics_cutoff
all_words.append({"word": word, "start": w_start, "end": w_end})
# Close small gaps: both words meet in the middle of the gap
gap_threshold = 0.5
for i in range(len(all_words) - 1):
gap = all_words[i + 1]["start"] - all_words[i]["end"]
if 0 < gap < gap_threshold:
mid = all_words[i]["end"] + gap / 2
all_words[i]["end"] = mid
all_words[i + 1]["start"] = mid
# Build drawtext filter chain — one filter per word, timed to speech
drawtext_filters = []
for w in all_words:
escaped = (w["word"]
.replace("\\", "\\\\")
.replace("'", "\u2019")
.replace('"', '\\"')
.replace(":", "\\:")
.replace("%", "%%")
.replace("[", "\\[")
.replace("]", "\\]"))
start = w["start"] - audio_offset
end = w["end"] - audio_offset
drawtext_filters.append(
f"drawtext=text='{escaped}'"
f":fontfile='{font_path}'"
f":fontsize=36"
f":fontcolor={font_color}"
f":x=(w-text_w)/2:y=(h-text_h)/2"
f":enable='between(t,{start:.3f},{end:.3f})'"
)
has_cover = cover_art is not None and drop_time is not None
has_lyrics = len(drawtext_filters) > 0
if not has_cover and not has_lyrics:
subprocess.run([
"ffmpeg", "-y", "-i", str(video_path),
"-c", "copy", str(output_path),
], check=True, capture_output=True)
return
if has_cover:
drop_start = drop_time - audio_offset
enable = f"enable='gte(t,{drop_start:.3f})'"
# --- Cover art layout (change these to adjust) ---
art_h = 270 # cover art height in px
art_y_offset = 10 # px below center (positive = down)
badge_h = 56 # spotify badge height in px
# Probe video height for position calculations
vid_h = int(subprocess.run([
"ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=height", "-of", "csv=p=0",
str(video_path),
], capture_output=True, text=True, check=True).stdout.strip())
art_center = vid_h / 2 + art_y_offset
art_top = art_center - art_h / 2
art_bottom = art_center + art_h / 2
# Square = 9:16 crop region (side = vid_h * 9/16)
sq_side = vid_h * 9 / 16
sq_top = (vid_h - sq_side) / 2
sq_bottom = (vid_h + sq_side) / 2
# Badge centered between square top and art top
badge_center_y = (sq_top + art_top) / 2
badge_y = int(badge_center_y - badge_h / 2)
# Title centered between art bottom and square bottom
title_center_y = int((art_bottom + sq_bottom) / 2)
art_overlay_y = int(art_center - art_h / 2)
parts = [
f"[1:v]scale=-2:{art_h}:flags=lanczos[art]",
f"[2:v]scale=-2:{badge_h}:flags=lanczos[badge]",
f"[0:v][art]overlay=(W-w)/2:{art_overlay_y}:{enable}[v1]",
f"[v1][badge]overlay=(W-w)/2:{badge_y}:{enable}",
]
# Add song title drawtext below cover art
title_escaped = (song_name
.replace("\\", "\\\\")
.replace("'", "\u2019")
.replace('"', '\\"')
.replace(":", "\\:")
.replace("%", "%%"))
title_text = f'\\"{title_escaped}\\" out now!'.lower()
parts[-1] += (
f",drawtext=text='{title_text}'"
f":fontfile='{font_path}'"
f":fontsize=40"
f":fontcolor={font_color}"
f":x=(w-text_w)/2:y={title_center_y}-text_h/2"
f":{enable}"
)
# Chain drawtext lyrics filters
if has_lyrics:
parts[-1] += "," + ",".join(drawtext_filters)
filter_chain = ";".join(parts)
cmd = [
"ffmpeg", "-y",
"-i", str(video_path),
"-i", str(cover_art),
"-i", str(_SPOTIFY_BADGE),
"-filter_complex", filter_chain,
"-c:v", "libx264", "-preset", "fast",
"-c:a", "copy",
str(output_path),
]
subprocess.run(cmd, check=True, capture_output=True)
else:
# Lyrics only, no cover art
filter_chain = ",".join(drawtext_filters)
subprocess.run([
"ffmpeg", "-y",
"-i", str(video_path),
"-vf", filter_chain,
"-c:v", "libx264", "-preset", "fast",
"-c:a", "copy",
str(output_path),
], check=True, capture_output=True)
def assemble(
run_dir: str | Path,
audio_path: str | Path | None = None,
font_name: str = DEFAULT_FONT,
font_color: str = DEFAULT_FONT_COLOR,
cover_art: str | Path | None = None,
) -> Path:
"""Assemble final video with dynamic pacing, clip shuffling, and lyrics.
Args:
run_dir: Run directory containing clips/, segments.json, drop.json.
audio_path: Path to the original audio. Auto-detected if None.
font_name: Display name of the font for lyrics overlay.
font_color: Hex color for lyrics text (e.g. '#FFF7D4').
cover_art: Path to cover art image. Overlayed from the drop onwards.
Returns:
Path to the final video file.
"""
run_dir = Path(run_dir)
clips_dir = run_dir / "clips"
output_dir = run_dir / "output"
output_dir.mkdir(parents=True, exist_ok=True)
with open(run_dir / "segments.json") as f:
segments = json.load(f)
# Load drop time
drop_time = None
drop_path = run_dir / "drop.json"
if drop_path.exists():
with open(drop_path) as f:
drop_time = json.load(f).get("drop_time")
print(f" Drop at {drop_time:.3f}s")
else:
print(" No drop detected — using uniform pacing")
if audio_path is None:
audio_path = _get_audio_path(run_dir)
audio_path = Path(audio_path)
# --- Step 1: Register clip halves (no pre-splitting needed) ---
sub_clips = [] # list of (clip_id, half, path, full_duration)
for seg in segments:
idx = seg["segment"]
clip_path = clips_dir / f"clip_{idx:03d}.mp4"
if not clip_path.exists():
print(f" Warning: {clip_path.name} not found, skipping")
continue
halves = _split_clip(clip_path, idx)
sub_clips.append((idx, "first", halves["first"], halves["first_duration"]))
sub_clips.append((idx, "second", halves["second"], halves["second_duration"]))
print(f" Registered {clip_path.name} ({halves['first_duration']:.1f}s)")
if not sub_clips:
raise FileNotFoundError(f"No clips found in {clips_dir}")
# --- Step 2: Build sub-segment timeline ---
sub_segments = _build_sub_segments(segments, drop_time)
print(f" Timeline: {len(sub_segments)} slots "
f"({len([s for s in sub_segments if s['duration'] < 1.5])} fast cuts)")
# --- Step 3: Shuffle sub-clips into slots ---
assigned = _shuffle_with_distance(sub_clips.copy(), n_slots=len(sub_segments))
# --- Step 4: Frame-accurate trim of each sub-clip to slot duration ---
# Detect FPS from first available sub-clip
fps = _get_clip_fps(assigned[0][2])
print(f" Source FPS: {fps}")
trimmed_dir = run_dir / "clips_trimmed"
trimmed_dir.mkdir(exist_ok=True)
trimmed_paths = []
# Get clip dimensions from the first available clip (all clips share resolution)
clip_width, clip_height = _get_clip_dimensions(assigned[0][2])
print(f" Clip resolution: {clip_width}x{clip_height}")
# Track cumulative frames to prevent drift between cuts and beats
cumulative_frames = 0
cumulative_target = 0.0
for i, (sub_seg, (clip_id, half, clip_path, clip_dur)) in enumerate(
zip(sub_segments, assigned)
):
slot_dur = sub_seg["duration"]
cumulative_target += min(slot_dur, clip_dur)
target_frame = round(cumulative_target * fps)
n_frames = max(1, target_frame - cumulative_frames)
cumulative_frames = target_frame
# "first" half starts from 0, "second" half starts from end minus slot duration
# This makes the two halves show maximally different frames
if half == "second":
ss = max(0, clip_dur - slot_dur)
else:
ss = 0
# Apply Ken Burns effect — cycle through effects per slot
effect = KEN_BURNS_EFFECTS[i % len(KEN_BURNS_EFFECTS)]
vf = _ken_burns_filter(effect, n_frames, clip_width, clip_height)
trimmed_path = trimmed_dir / f"slot_{i:03d}.mp4"
cmd = [
"ffmpeg", "-y",
"-ss", f"{ss:.3f}",
"-i", str(clip_path),
"-frames:v", str(n_frames),
"-vf", vf,
"-c:v", "libx264", "-preset", "fast",
"-r", str(int(fps)),
"-an",
str(trimmed_path),
]
subprocess.run(cmd, check=True, capture_output=True)
trimmed_paths.append(trimmed_path)
actual_dur = n_frames / fps
print(f" Slot {i}: clip {clip_id} ({half}, ss={ss:.1f}s, {effect}) → "
f"{n_frames}f/{actual_dur:.3f}s (target {slot_dur:.3f}s)")
# --- Step 5: Concatenate (copy, no re-encode to preserve timing) ---
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False, dir=str(run_dir)
) as f:
for p in trimmed_paths:
f.write(f"file '{p.resolve()}'\n")
concat_list = f.name
concat_path = output_dir / "video_only.mp4"
subprocess.run([
"ffmpeg", "-y",
"-f", "concat", "-safe", "0",
"-i", concat_list,
"-c", "copy",
str(concat_path),
], check=True, capture_output=True)
# --- Step 6: Overlay audio ---
audio_start = segments[0]["start"]
video_duration = cumulative_frames / fps # actual frame-accurate duration
with_audio_path = output_dir / "with_audio.mp4"
subprocess.run([
"ffmpeg", "-y",
"-i", str(concat_path),
"-ss", f"{audio_start:.3f}",
"-i", str(audio_path),
"-t", f"{video_duration:.3f}",
"-c:v", "copy",
"-c:a", "aac", "-b:a", "192k",
"-map", "0:v:0", "-map", "1:a:0",
"-shortest",
str(with_audio_path),
], check=True, capture_output=True)
# --- Step 7: Lyrics + cover art overlay ---
overlay_path = output_dir / "with_overlay.mp4"
cover_path = Path(cover_art) if cover_art else None
song_name = run_dir.parent.name
_add_lyrics_overlay(with_audio_path, segments, overlay_path, audio_start,
font_name=font_name, font_color=font_color,
cover_art=cover_path, drop_time=drop_time,
song_name=song_name)
# --- Step 8: Crop to exact 9:16 ---
final_path = output_dir / "final.mp4"
subprocess.run([
"ffmpeg", "-y",
"-i", str(overlay_path),
"-vf", "crop=2*floor(ih*9/16/2):ih:(iw-2*floor(ih*9/16/2))/2:0",
"-c:v", "libx264", "-preset", "fast",
"-c:a", "copy",
str(final_path),
], check=True, capture_output=True)
# Clean up
Path(concat_list).unlink(missing_ok=True)
print(f"\nFinal video: {final_path}")
print(f" Duration: {video_duration:.2f}s")
print(f" Slots: {len(sub_segments)} ({len(segments)} original segments)")
return final_path
def run(
run_dir: str | Path,
font_name: str = DEFAULT_FONT,
font_color: str = DEFAULT_FONT_COLOR,
cover_art: str | Path | None = None,
) -> Path:
"""Assemble final video from clips + audio.
Args:
run_dir: Run directory (e.g. data/Gone/run_001/).
font_name: Display name of the font for lyrics overlay.
font_color: Hex color for lyrics text.
cover_art: Path to cover art image (optional).
Returns:
Path to final video.
"""
print("Assembling final video...")
return assemble(run_dir, font_name=font_name, font_color=font_color,
cover_art=cover_art)
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python -m src.assembler <run_dir>")
print(" e.g. python -m src.assembler data/Gone/run_001")
sys.exit(1)
run(sys.argv[1])