"""FFmpeg video stitching, clip splitting/shuffling, lyrics overlay. Takes generated video clips (one per 4-beat segment), splits each into two halves, shuffles them with a distance constraint, builds a timeline with dynamic pacing (4-beat cuts before the drop, 2-beat after), overlays audio and lyrics text. """ import json import random import subprocess import tempfile from pathlib import Path def _get_audio_path(run_dir: Path) -> Path: """Find the original audio file one level above the run directory.""" song_dir = run_dir.parent for ext in [".wav", ".mp3", ".flac", ".m4a"]: candidates = list(song_dir.glob(f"*{ext}")) if candidates: return candidates[0] raise FileNotFoundError(f"No audio file found in {song_dir}") def _get_clip_duration(clip_path: Path) -> float: """Get video duration in seconds using ffprobe.""" result = subprocess.run([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "csv=p=0", str(clip_path), ], capture_output=True, text=True, check=True) return float(result.stdout.strip()) def _get_clip_fps(clip_path: Path) -> float: """Get video frame rate using ffprobe.""" result = subprocess.run([ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=r_frame_rate", "-of", "csv=p=0", str(clip_path), ], capture_output=True, text=True, check=True) num, den = result.stdout.strip().split("/") return int(num) / int(den) def _trim_clip(clip_path: Path, start: float, duration: float, output_path: Path): """Trim a video clip from a start point to a duration using FFmpeg.""" cmd = [ "ffmpeg", "-y", "-ss", f"{start:.3f}", "-i", str(clip_path), "-t", f"{duration:.3f}", "-c:v", "libx264", "-preset", "fast", "-an", str(output_path), ] subprocess.run(cmd, check=True, capture_output=True) # --------------------------------------------------------------------------- # Ken Burns effects — subtle pan/zoom applied per slot for added motion # --------------------------------------------------------------------------- # Zoom factor: 8% total movement over the clip duration _KB_ZOOM = 0.45 KEN_BURNS_EFFECTS = [ "zoom_in", "zoom_out", ] def _ken_burns_filter( effect: str, n_frames: int, width: int, height: int, ) -> str: """Build an FFmpeg filter for a smooth Ken Burns zoom effect on video. Upscales the video 4x before applying zoompan with d=1 (one output frame per input frame), then scales back to original size. The 4x upscale makes integer rounding in zoompan negligible, eliminating visible jitter. """ z = _KB_ZOOM N = max(n_frames, 1) W, H = width, height # Upscale factor — higher = smoother but slower UP = 8 UW, UH = W * UP, H * UP if effect == "zoom_in": zoom_expr = f"1+{z}*on/{N}" elif effect == "zoom_out": zoom_expr = f"1+{z}-{z}*on/{N}" else: return f"scale={W}:{H}" return ( f"scale={UW}:{UH}:flags=lanczos," f"zoompan=z='{zoom_expr}':" f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)':" f"d=1:s={UW}x{UH}," f"scale={W}:{H}:flags=lanczos" ) def _get_clip_dimensions(clip_path: Path) -> tuple[int, int]: """Get width and height of a video clip.""" result = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=s=x:p=0", str(clip_path)], capture_output=True, text=True, check=True, ) w, h = result.stdout.strip().split("x") return int(w), int(h) def _split_clip(clip_path: Path, clip_id: int) -> dict: """Register a clip's two halves without pre-splitting. The "first" half plays from the start, the "second" half plays from the end (offset back by the slot duration at trim time). This makes the two halves maximally different — no fixed midpoint split. Returns dict with the original path and full duration for each half. """ duration = _get_clip_duration(clip_path) return { "clip_id": clip_id, "first": clip_path, "second": clip_path, "first_duration": duration, "second_duration": duration, } def _build_sub_segments(segments: list[dict], drop_time: float | None) -> list[dict]: """Build the final timeline of sub-segments. Before the drop: one slot per 4-beat segment. After the drop: each 4-beat segment splits into two 2-beat slots using the beat timestamps stored in the segment. """ sub_segments = [] for seg in segments: beats = seg.get("beats", [seg["start"], seg["end"]]) is_after_drop = drop_time is not None and seg["start"] >= drop_time if is_after_drop and len(beats) >= 3: # Split at midpoint beat (beat 2 of 4) mid_idx = len(beats) // 2 mid_time = beats[mid_idx] sub_segments.append({ "start": seg["start"], "end": mid_time, "duration": round(mid_time - seg["start"], 3), "lyrics": seg.get("lyrics", ""), "parent_segment": seg["segment"], }) sub_segments.append({ "start": mid_time, "end": seg["end"], "duration": round(seg["end"] - mid_time, 3), "lyrics": "", # lyrics stay on the first half "parent_segment": seg["segment"], }) else: # Before drop: one slot for the full 4-beat segment sub_segments.append({ "start": seg["start"], "end": seg["end"], "duration": seg["duration"], "lyrics": seg.get("lyrics", ""), "parent_segment": seg["segment"], }) return sub_segments def _shuffle_with_distance(pool: list[tuple], n_slots: int) -> list[tuple]: """Select n_slots sub-clips maximising clip diversity and spacing. Shuffles clip IDs once, then repeats that order to fill all slots. First pass uses "first" halves, second pass uses "second" halves. Same clip is always exactly n_clips positions apart — maximum spacing. Each item is (clip_id, half_label, path, duration). """ by_clip: dict[int, list[tuple]] = {} for item in pool: by_clip.setdefault(item[0], []).append(item) clip_ids = list(by_clip.keys()) random.shuffle(clip_ids) # Repeat the shuffled order: [4,5,1,2,6,3, 4,5,1,2,6,3, ...] result = [] cycle = 0 while len(result) < n_slots: for cid in clip_ids: if len(result) >= n_slots: break halves = by_clip[cid] # First cycle uses "first" half, second cycle uses "second", etc. half_idx = cycle % len(halves) result.append(halves[half_idx]) cycle += 1 return result # Font registry — maps display names to .ttf filenames in fonts/ FONTS = { "Bebas Neue": "BebasNeue-Regular.ttf", "Teko": "Teko-Bold.ttf", "Russo One": "RussoOne-Regular.ttf", "Staatliches": "Staatliches-Regular.ttf", } DEFAULT_FONT = "Bebas Neue" DEFAULT_FONT_COLOR = "#FFF7D4" _FONTS_DIR = Path(__file__).resolve().parent.parent / "fonts" def font_names() -> list[str]: """Return list of available font display names.""" return list(FONTS.keys()) def _get_font_path(font_name: str) -> Path: """Resolve a font display name to its .ttf file path.""" filename = FONTS.get(font_name, FONTS[DEFAULT_FONT]) return _FONTS_DIR / filename _SPOTIFY_BADGE = Path(__file__).resolve().parent.parent / "assets" / "spotify_badge.png" def _add_lyrics_overlay( video_path: Path, segments: list[dict], output_path: Path, audio_offset: float, font_name: str = DEFAULT_FONT, font_color: str = DEFAULT_FONT_COLOR, cover_art: Path | None = None, drop_time: float | None = None, song_name: str = "", ): """Add lyrics text and optional cover art overlay using FFmpeg filters.""" font_path = _get_font_path(font_name) # If cover art provided, lyrics stop at the drop lyrics_cutoff = None if cover_art is not None and drop_time is not None: lyrics_cutoff = drop_time # Collect all words with timestamps all_words = [] for seg in segments: for word_info in seg.get("words", []): word = word_info["word"].strip().lower() if not word: continue w_start = word_info["start"] w_end = word_info["end"] # Skip words that start after the cutoff if lyrics_cutoff is not None and w_start >= lyrics_cutoff: continue # Clamp end to cutoff for words that span the drop if lyrics_cutoff is not None and w_end > lyrics_cutoff: w_end = lyrics_cutoff all_words.append({"word": word, "start": w_start, "end": w_end}) # Close small gaps: both words meet in the middle of the gap gap_threshold = 0.5 for i in range(len(all_words) - 1): gap = all_words[i + 1]["start"] - all_words[i]["end"] if 0 < gap < gap_threshold: mid = all_words[i]["end"] + gap / 2 all_words[i]["end"] = mid all_words[i + 1]["start"] = mid # Build drawtext filter chain — one filter per word, timed to speech drawtext_filters = [] for w in all_words: escaped = (w["word"] .replace("\\", "\\\\") .replace("'", "\u2019") .replace('"', '\\"') .replace(":", "\\:") .replace("%", "%%") .replace("[", "\\[") .replace("]", "\\]")) start = w["start"] - audio_offset end = w["end"] - audio_offset drawtext_filters.append( f"drawtext=text='{escaped}'" f":fontfile='{font_path}'" f":fontsize=36" f":fontcolor={font_color}" f":x=(w-text_w)/2:y=(h-text_h)/2" f":enable='between(t,{start:.3f},{end:.3f})'" ) has_cover = cover_art is not None and drop_time is not None has_lyrics = len(drawtext_filters) > 0 if not has_cover and not has_lyrics: subprocess.run([ "ffmpeg", "-y", "-i", str(video_path), "-c", "copy", str(output_path), ], check=True, capture_output=True) return if has_cover: drop_start = drop_time - audio_offset enable = f"enable='gte(t,{drop_start:.3f})'" # --- Cover art layout (change these to adjust) --- art_h = 270 # cover art height in px art_y_offset = 10 # px below center (positive = down) badge_h = 56 # spotify badge height in px # Probe video height for position calculations vid_h = int(subprocess.run([ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=height", "-of", "csv=p=0", str(video_path), ], capture_output=True, text=True, check=True).stdout.strip()) art_center = vid_h / 2 + art_y_offset art_top = art_center - art_h / 2 art_bottom = art_center + art_h / 2 # Square = 9:16 crop region (side = vid_h * 9/16) sq_side = vid_h * 9 / 16 sq_top = (vid_h - sq_side) / 2 sq_bottom = (vid_h + sq_side) / 2 # Badge centered between square top and art top badge_center_y = (sq_top + art_top) / 2 badge_y = int(badge_center_y - badge_h / 2) # Title centered between art bottom and square bottom title_center_y = int((art_bottom + sq_bottom) / 2) art_overlay_y = int(art_center - art_h / 2) parts = [ f"[1:v]scale=-2:{art_h}:flags=lanczos[art]", f"[2:v]scale=-2:{badge_h}:flags=lanczos[badge]", f"[0:v][art]overlay=(W-w)/2:{art_overlay_y}:{enable}[v1]", f"[v1][badge]overlay=(W-w)/2:{badge_y}:{enable}", ] # Add song title drawtext below cover art title_escaped = (song_name .replace("\\", "\\\\") .replace("'", "\u2019") .replace('"', '\\"') .replace(":", "\\:") .replace("%", "%%")) title_text = f'\\"{title_escaped}\\" out now!'.lower() parts[-1] += ( f",drawtext=text='{title_text}'" f":fontfile='{font_path}'" f":fontsize=40" f":fontcolor={font_color}" f":x=(w-text_w)/2:y={title_center_y}-text_h/2" f":{enable}" ) # Chain drawtext lyrics filters if has_lyrics: parts[-1] += "," + ",".join(drawtext_filters) filter_chain = ";".join(parts) cmd = [ "ffmpeg", "-y", "-i", str(video_path), "-i", str(cover_art), "-i", str(_SPOTIFY_BADGE), "-filter_complex", filter_chain, "-c:v", "libx264", "-preset", "fast", "-c:a", "copy", str(output_path), ] subprocess.run(cmd, check=True, capture_output=True) else: # Lyrics only, no cover art filter_chain = ",".join(drawtext_filters) subprocess.run([ "ffmpeg", "-y", "-i", str(video_path), "-vf", filter_chain, "-c:v", "libx264", "-preset", "fast", "-c:a", "copy", str(output_path), ], check=True, capture_output=True) def assemble( run_dir: str | Path, audio_path: str | Path | None = None, font_name: str = DEFAULT_FONT, font_color: str = DEFAULT_FONT_COLOR, cover_art: str | Path | None = None, ) -> Path: """Assemble final video with dynamic pacing, clip shuffling, and lyrics. Args: run_dir: Run directory containing clips/, segments.json, drop.json. audio_path: Path to the original audio. Auto-detected if None. font_name: Display name of the font for lyrics overlay. font_color: Hex color for lyrics text (e.g. '#FFF7D4'). cover_art: Path to cover art image. Overlayed from the drop onwards. Returns: Path to the final video file. """ run_dir = Path(run_dir) clips_dir = run_dir / "clips" output_dir = run_dir / "output" output_dir.mkdir(parents=True, exist_ok=True) with open(run_dir / "segments.json") as f: segments = json.load(f) # Load drop time drop_time = None drop_path = run_dir / "drop.json" if drop_path.exists(): with open(drop_path) as f: drop_time = json.load(f).get("drop_time") print(f" Drop at {drop_time:.3f}s") else: print(" No drop detected — using uniform pacing") if audio_path is None: audio_path = _get_audio_path(run_dir) audio_path = Path(audio_path) # --- Step 1: Register clip halves (no pre-splitting needed) --- sub_clips = [] # list of (clip_id, half, path, full_duration) for seg in segments: idx = seg["segment"] clip_path = clips_dir / f"clip_{idx:03d}.mp4" if not clip_path.exists(): print(f" Warning: {clip_path.name} not found, skipping") continue halves = _split_clip(clip_path, idx) sub_clips.append((idx, "first", halves["first"], halves["first_duration"])) sub_clips.append((idx, "second", halves["second"], halves["second_duration"])) print(f" Registered {clip_path.name} ({halves['first_duration']:.1f}s)") if not sub_clips: raise FileNotFoundError(f"No clips found in {clips_dir}") # --- Step 2: Build sub-segment timeline --- sub_segments = _build_sub_segments(segments, drop_time) print(f" Timeline: {len(sub_segments)} slots " f"({len([s for s in sub_segments if s['duration'] < 1.5])} fast cuts)") # --- Step 3: Shuffle sub-clips into slots --- assigned = _shuffle_with_distance(sub_clips.copy(), n_slots=len(sub_segments)) # --- Step 4: Frame-accurate trim of each sub-clip to slot duration --- # Detect FPS from first available sub-clip fps = _get_clip_fps(assigned[0][2]) print(f" Source FPS: {fps}") trimmed_dir = run_dir / "clips_trimmed" trimmed_dir.mkdir(exist_ok=True) trimmed_paths = [] # Get clip dimensions from the first available clip (all clips share resolution) clip_width, clip_height = _get_clip_dimensions(assigned[0][2]) print(f" Clip resolution: {clip_width}x{clip_height}") # Track cumulative frames to prevent drift between cuts and beats cumulative_frames = 0 cumulative_target = 0.0 for i, (sub_seg, (clip_id, half, clip_path, clip_dur)) in enumerate( zip(sub_segments, assigned) ): slot_dur = sub_seg["duration"] cumulative_target += min(slot_dur, clip_dur) target_frame = round(cumulative_target * fps) n_frames = max(1, target_frame - cumulative_frames) cumulative_frames = target_frame # "first" half starts from 0, "second" half starts from end minus slot duration # This makes the two halves show maximally different frames if half == "second": ss = max(0, clip_dur - slot_dur) else: ss = 0 # Apply Ken Burns effect — cycle through effects per slot effect = KEN_BURNS_EFFECTS[i % len(KEN_BURNS_EFFECTS)] vf = _ken_burns_filter(effect, n_frames, clip_width, clip_height) trimmed_path = trimmed_dir / f"slot_{i:03d}.mp4" cmd = [ "ffmpeg", "-y", "-ss", f"{ss:.3f}", "-i", str(clip_path), "-frames:v", str(n_frames), "-vf", vf, "-c:v", "libx264", "-preset", "fast", "-r", str(int(fps)), "-an", str(trimmed_path), ] subprocess.run(cmd, check=True, capture_output=True) trimmed_paths.append(trimmed_path) actual_dur = n_frames / fps print(f" Slot {i}: clip {clip_id} ({half}, ss={ss:.1f}s, {effect}) → " f"{n_frames}f/{actual_dur:.3f}s (target {slot_dur:.3f}s)") # --- Step 5: Concatenate (copy, no re-encode to preserve timing) --- with tempfile.NamedTemporaryFile( mode="w", suffix=".txt", delete=False, dir=str(run_dir) ) as f: for p in trimmed_paths: f.write(f"file '{p.resolve()}'\n") concat_list = f.name concat_path = output_dir / "video_only.mp4" subprocess.run([ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_list, "-c", "copy", str(concat_path), ], check=True, capture_output=True) # --- Step 6: Overlay audio --- audio_start = segments[0]["start"] video_duration = cumulative_frames / fps # actual frame-accurate duration with_audio_path = output_dir / "with_audio.mp4" subprocess.run([ "ffmpeg", "-y", "-i", str(concat_path), "-ss", f"{audio_start:.3f}", "-i", str(audio_path), "-t", f"{video_duration:.3f}", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-map", "0:v:0", "-map", "1:a:0", "-shortest", str(with_audio_path), ], check=True, capture_output=True) # --- Step 7: Lyrics + cover art overlay --- overlay_path = output_dir / "with_overlay.mp4" cover_path = Path(cover_art) if cover_art else None song_name = run_dir.parent.name _add_lyrics_overlay(with_audio_path, segments, overlay_path, audio_start, font_name=font_name, font_color=font_color, cover_art=cover_path, drop_time=drop_time, song_name=song_name) # --- Step 8: Crop to exact 9:16 --- final_path = output_dir / "final.mp4" subprocess.run([ "ffmpeg", "-y", "-i", str(overlay_path), "-vf", "crop=2*floor(ih*9/16/2):ih:(iw-2*floor(ih*9/16/2))/2:0", "-c:v", "libx264", "-preset", "fast", "-c:a", "copy", str(final_path), ], check=True, capture_output=True) # Clean up Path(concat_list).unlink(missing_ok=True) print(f"\nFinal video: {final_path}") print(f" Duration: {video_duration:.2f}s") print(f" Slots: {len(sub_segments)} ({len(segments)} original segments)") return final_path def run( run_dir: str | Path, font_name: str = DEFAULT_FONT, font_color: str = DEFAULT_FONT_COLOR, cover_art: str | Path | None = None, ) -> Path: """Assemble final video from clips + audio. Args: run_dir: Run directory (e.g. data/Gone/run_001/). font_name: Display name of the font for lyrics overlay. font_color: Hex color for lyrics text. cover_art: Path to cover art image (optional). Returns: Path to final video. """ print("Assembling final video...") return assemble(run_dir, font_name=font_name, font_color=font_color, cover_art=cover_art) if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: python -m src.assembler ") print(" e.g. python -m src.assembler data/Gone/run_001") sys.exit(1) run(sys.argv[1])