| | """FFmpeg video stitching, clip splitting/shuffling, lyrics overlay. |
| | |
| | Takes generated video clips (one per 4-beat segment), splits each into |
| | two halves, shuffles them with a distance constraint, builds a timeline |
| | with dynamic pacing (4-beat cuts before the drop, 2-beat after), overlays |
| | audio and lyrics text. |
| | """ |
| |
|
| | import json |
| | import random |
| | import subprocess |
| | import tempfile |
| | from pathlib import Path |
| |
|
| |
|
| | def _get_audio_path(run_dir: Path) -> Path: |
| | """Find the original audio file one level above the run directory.""" |
| | song_dir = run_dir.parent |
| | for ext in [".wav", ".mp3", ".flac", ".m4a"]: |
| | candidates = list(song_dir.glob(f"*{ext}")) |
| | if candidates: |
| | return candidates[0] |
| | raise FileNotFoundError(f"No audio file found in {song_dir}") |
| |
|
| |
|
| | def _get_clip_duration(clip_path: Path) -> float: |
| | """Get video duration in seconds using ffprobe.""" |
| | result = subprocess.run([ |
| | "ffprobe", "-v", "error", |
| | "-show_entries", "format=duration", |
| | "-of", "csv=p=0", |
| | str(clip_path), |
| | ], capture_output=True, text=True, check=True) |
| | return float(result.stdout.strip()) |
| |
|
| |
|
| | def _get_clip_fps(clip_path: Path) -> float: |
| | """Get video frame rate using ffprobe.""" |
| | result = subprocess.run([ |
| | "ffprobe", "-v", "error", |
| | "-select_streams", "v:0", |
| | "-show_entries", "stream=r_frame_rate", |
| | "-of", "csv=p=0", |
| | str(clip_path), |
| | ], capture_output=True, text=True, check=True) |
| | num, den = result.stdout.strip().split("/") |
| | return int(num) / int(den) |
| |
|
| |
|
| | def _trim_clip(clip_path: Path, start: float, duration: float, output_path: Path): |
| | """Trim a video clip from a start point to a duration using FFmpeg.""" |
| | cmd = [ |
| | "ffmpeg", "-y", |
| | "-ss", f"{start:.3f}", |
| | "-i", str(clip_path), |
| | "-t", f"{duration:.3f}", |
| | "-c:v", "libx264", "-preset", "fast", |
| | "-an", |
| | str(output_path), |
| | ] |
| | subprocess.run(cmd, check=True, capture_output=True) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | _KB_ZOOM = 0.45 |
| |
|
| | KEN_BURNS_EFFECTS = [ |
| | "zoom_in", |
| | "zoom_out", |
| | ] |
| |
|
| |
|
| | def _ken_burns_filter( |
| | effect: str, n_frames: int, width: int, height: int, |
| | ) -> str: |
| | """Build an FFmpeg filter for a smooth Ken Burns zoom effect on video. |
| | |
| | Upscales the video 4x before applying zoompan with d=1 (one output |
| | frame per input frame), then scales back to original size. The 4x |
| | upscale makes integer rounding in zoompan negligible, eliminating |
| | visible jitter. |
| | """ |
| | z = _KB_ZOOM |
| | N = max(n_frames, 1) |
| | W, H = width, height |
| | |
| | UP = 8 |
| | UW, UH = W * UP, H * UP |
| |
|
| | if effect == "zoom_in": |
| | zoom_expr = f"1+{z}*on/{N}" |
| | elif effect == "zoom_out": |
| | zoom_expr = f"1+{z}-{z}*on/{N}" |
| | else: |
| | return f"scale={W}:{H}" |
| |
|
| | return ( |
| | f"scale={UW}:{UH}:flags=lanczos," |
| | f"zoompan=z='{zoom_expr}':" |
| | f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)':" |
| | f"d=1:s={UW}x{UH}," |
| | f"scale={W}:{H}:flags=lanczos" |
| | ) |
| |
|
| |
|
| | def _get_clip_dimensions(clip_path: Path) -> tuple[int, int]: |
| | """Get width and height of a video clip.""" |
| | result = subprocess.run( |
| | ["ffprobe", "-v", "error", "-select_streams", "v:0", |
| | "-show_entries", "stream=width,height", |
| | "-of", "csv=s=x:p=0", str(clip_path)], |
| | capture_output=True, text=True, check=True, |
| | ) |
| | w, h = result.stdout.strip().split("x") |
| | return int(w), int(h) |
| |
|
| |
|
| | def _split_clip(clip_path: Path, clip_id: int) -> dict: |
| | """Register a clip's two halves without pre-splitting. |
| | |
| | The "first" half plays from the start, the "second" half plays from |
| | the end (offset back by the slot duration at trim time). This makes |
| | the two halves maximally different — no fixed midpoint split. |
| | |
| | Returns dict with the original path and full duration for each half. |
| | """ |
| | duration = _get_clip_duration(clip_path) |
| |
|
| | return { |
| | "clip_id": clip_id, |
| | "first": clip_path, |
| | "second": clip_path, |
| | "first_duration": duration, |
| | "second_duration": duration, |
| | } |
| |
|
| |
|
| | def _build_sub_segments(segments: list[dict], drop_time: float | None) -> list[dict]: |
| | """Build the final timeline of sub-segments. |
| | |
| | Before the drop: one slot per 4-beat segment. |
| | After the drop: each 4-beat segment splits into two 2-beat slots |
| | using the beat timestamps stored in the segment. |
| | """ |
| | sub_segments = [] |
| |
|
| | for seg in segments: |
| | beats = seg.get("beats", [seg["start"], seg["end"]]) |
| | is_after_drop = drop_time is not None and seg["start"] >= drop_time |
| |
|
| | if is_after_drop and len(beats) >= 3: |
| | |
| | mid_idx = len(beats) // 2 |
| | mid_time = beats[mid_idx] |
| |
|
| | sub_segments.append({ |
| | "start": seg["start"], |
| | "end": mid_time, |
| | "duration": round(mid_time - seg["start"], 3), |
| | "lyrics": seg.get("lyrics", ""), |
| | "parent_segment": seg["segment"], |
| | }) |
| | sub_segments.append({ |
| | "start": mid_time, |
| | "end": seg["end"], |
| | "duration": round(seg["end"] - mid_time, 3), |
| | "lyrics": "", |
| | "parent_segment": seg["segment"], |
| | }) |
| | else: |
| | |
| | sub_segments.append({ |
| | "start": seg["start"], |
| | "end": seg["end"], |
| | "duration": seg["duration"], |
| | "lyrics": seg.get("lyrics", ""), |
| | "parent_segment": seg["segment"], |
| | }) |
| |
|
| | return sub_segments |
| |
|
| |
|
| | def _shuffle_with_distance(pool: list[tuple], n_slots: int) -> list[tuple]: |
| | """Select n_slots sub-clips maximising clip diversity and spacing. |
| | |
| | Shuffles clip IDs once, then repeats that order to fill all slots. |
| | First pass uses "first" halves, second pass uses "second" halves. |
| | Same clip is always exactly n_clips positions apart — maximum spacing. |
| | |
| | Each item is (clip_id, half_label, path, duration). |
| | """ |
| | by_clip: dict[int, list[tuple]] = {} |
| | for item in pool: |
| | by_clip.setdefault(item[0], []).append(item) |
| |
|
| | clip_ids = list(by_clip.keys()) |
| | random.shuffle(clip_ids) |
| |
|
| | |
| | result = [] |
| | cycle = 0 |
| | while len(result) < n_slots: |
| | for cid in clip_ids: |
| | if len(result) >= n_slots: |
| | break |
| | halves = by_clip[cid] |
| | |
| | half_idx = cycle % len(halves) |
| | result.append(halves[half_idx]) |
| | cycle += 1 |
| |
|
| | return result |
| |
|
| |
|
| | |
| | FONTS = { |
| | "Bebas Neue": "BebasNeue-Regular.ttf", |
| | "Teko": "Teko-Bold.ttf", |
| | "Russo One": "RussoOne-Regular.ttf", |
| | "Staatliches": "Staatliches-Regular.ttf", |
| | } |
| |
|
| | DEFAULT_FONT = "Bebas Neue" |
| | DEFAULT_FONT_COLOR = "#FFF7D4" |
| |
|
| | _FONTS_DIR = Path(__file__).resolve().parent.parent / "fonts" |
| |
|
| |
|
| | def font_names() -> list[str]: |
| | """Return list of available font display names.""" |
| | return list(FONTS.keys()) |
| |
|
| |
|
| | def _get_font_path(font_name: str) -> Path: |
| | """Resolve a font display name to its .ttf file path.""" |
| | filename = FONTS.get(font_name, FONTS[DEFAULT_FONT]) |
| | return _FONTS_DIR / filename |
| |
|
| |
|
| | _SPOTIFY_BADGE = Path(__file__).resolve().parent.parent / "assets" / "spotify_badge.png" |
| |
|
| |
|
| | def _add_lyrics_overlay( |
| | video_path: Path, |
| | segments: list[dict], |
| | output_path: Path, |
| | audio_offset: float, |
| | font_name: str = DEFAULT_FONT, |
| | font_color: str = DEFAULT_FONT_COLOR, |
| | cover_art: Path | None = None, |
| | drop_time: float | None = None, |
| | song_name: str = "", |
| | ): |
| | """Add lyrics text and optional cover art overlay using FFmpeg filters.""" |
| | font_path = _get_font_path(font_name) |
| |
|
| | |
| | lyrics_cutoff = None |
| | if cover_art is not None and drop_time is not None: |
| | lyrics_cutoff = drop_time |
| |
|
| | |
| | all_words = [] |
| | for seg in segments: |
| | for word_info in seg.get("words", []): |
| | word = word_info["word"].strip().lower() |
| | if not word: |
| | continue |
| | w_start = word_info["start"] |
| | w_end = word_info["end"] |
| | |
| | if lyrics_cutoff is not None and w_start >= lyrics_cutoff: |
| | continue |
| | |
| | if lyrics_cutoff is not None and w_end > lyrics_cutoff: |
| | w_end = lyrics_cutoff |
| | all_words.append({"word": word, "start": w_start, "end": w_end}) |
| |
|
| | |
| | gap_threshold = 0.5 |
| | for i in range(len(all_words) - 1): |
| | gap = all_words[i + 1]["start"] - all_words[i]["end"] |
| | if 0 < gap < gap_threshold: |
| | mid = all_words[i]["end"] + gap / 2 |
| | all_words[i]["end"] = mid |
| | all_words[i + 1]["start"] = mid |
| |
|
| | |
| | drawtext_filters = [] |
| | for w in all_words: |
| | escaped = (w["word"] |
| | .replace("\\", "\\\\") |
| | .replace("'", "\u2019") |
| | .replace('"', '\\"') |
| | .replace(":", "\\:") |
| | .replace("%", "%%") |
| | .replace("[", "\\[") |
| | .replace("]", "\\]")) |
| |
|
| | start = w["start"] - audio_offset |
| | end = w["end"] - audio_offset |
| |
|
| | drawtext_filters.append( |
| | f"drawtext=text='{escaped}'" |
| | f":fontfile='{font_path}'" |
| | f":fontsize=36" |
| | f":fontcolor={font_color}" |
| | f":x=(w-text_w)/2:y=(h-text_h)/2" |
| | f":enable='between(t,{start:.3f},{end:.3f})'" |
| | ) |
| |
|
| | has_cover = cover_art is not None and drop_time is not None |
| | has_lyrics = len(drawtext_filters) > 0 |
| |
|
| | if not has_cover and not has_lyrics: |
| | subprocess.run([ |
| | "ffmpeg", "-y", "-i", str(video_path), |
| | "-c", "copy", str(output_path), |
| | ], check=True, capture_output=True) |
| | return |
| |
|
| | if has_cover: |
| | drop_start = drop_time - audio_offset |
| | enable = f"enable='gte(t,{drop_start:.3f})'" |
| |
|
| | |
| | art_h = 270 |
| | art_y_offset = 10 |
| | badge_h = 56 |
| |
|
| | |
| | vid_h = int(subprocess.run([ |
| | "ffprobe", "-v", "error", "-select_streams", "v:0", |
| | "-show_entries", "stream=height", "-of", "csv=p=0", |
| | str(video_path), |
| | ], capture_output=True, text=True, check=True).stdout.strip()) |
| | art_center = vid_h / 2 + art_y_offset |
| | art_top = art_center - art_h / 2 |
| | art_bottom = art_center + art_h / 2 |
| |
|
| | |
| | sq_side = vid_h * 9 / 16 |
| | sq_top = (vid_h - sq_side) / 2 |
| | sq_bottom = (vid_h + sq_side) / 2 |
| |
|
| | |
| | badge_center_y = (sq_top + art_top) / 2 |
| | badge_y = int(badge_center_y - badge_h / 2) |
| |
|
| | |
| | title_center_y = int((art_bottom + sq_bottom) / 2) |
| |
|
| | art_overlay_y = int(art_center - art_h / 2) |
| |
|
| | parts = [ |
| | f"[1:v]scale=-2:{art_h}:flags=lanczos[art]", |
| | f"[2:v]scale=-2:{badge_h}:flags=lanczos[badge]", |
| | f"[0:v][art]overlay=(W-w)/2:{art_overlay_y}:{enable}[v1]", |
| | f"[v1][badge]overlay=(W-w)/2:{badge_y}:{enable}", |
| | ] |
| |
|
| | |
| | title_escaped = (song_name |
| | .replace("\\", "\\\\") |
| | .replace("'", "\u2019") |
| | .replace('"', '\\"') |
| | .replace(":", "\\:") |
| | .replace("%", "%%")) |
| | title_text = f'\\"{title_escaped}\\" out now!'.lower() |
| | parts[-1] += ( |
| | f",drawtext=text='{title_text}'" |
| | f":fontfile='{font_path}'" |
| | f":fontsize=40" |
| | f":fontcolor={font_color}" |
| | f":x=(w-text_w)/2:y={title_center_y}-text_h/2" |
| | f":{enable}" |
| | ) |
| |
|
| | |
| | if has_lyrics: |
| | parts[-1] += "," + ",".join(drawtext_filters) |
| | filter_chain = ";".join(parts) |
| |
|
| | cmd = [ |
| | "ffmpeg", "-y", |
| | "-i", str(video_path), |
| | "-i", str(cover_art), |
| | "-i", str(_SPOTIFY_BADGE), |
| | "-filter_complex", filter_chain, |
| | "-c:v", "libx264", "-preset", "fast", |
| | "-c:a", "copy", |
| | str(output_path), |
| | ] |
| | subprocess.run(cmd, check=True, capture_output=True) |
| | else: |
| | |
| | filter_chain = ",".join(drawtext_filters) |
| | subprocess.run([ |
| | "ffmpeg", "-y", |
| | "-i", str(video_path), |
| | "-vf", filter_chain, |
| | "-c:v", "libx264", "-preset", "fast", |
| | "-c:a", "copy", |
| | str(output_path), |
| | ], check=True, capture_output=True) |
| |
|
| |
|
| | def assemble( |
| | run_dir: str | Path, |
| | audio_path: str | Path | None = None, |
| | font_name: str = DEFAULT_FONT, |
| | font_color: str = DEFAULT_FONT_COLOR, |
| | cover_art: str | Path | None = None, |
| | ) -> Path: |
| | """Assemble final video with dynamic pacing, clip shuffling, and lyrics. |
| | |
| | Args: |
| | run_dir: Run directory containing clips/, segments.json, drop.json. |
| | audio_path: Path to the original audio. Auto-detected if None. |
| | font_name: Display name of the font for lyrics overlay. |
| | font_color: Hex color for lyrics text (e.g. '#FFF7D4'). |
| | cover_art: Path to cover art image. Overlayed from the drop onwards. |
| | |
| | Returns: |
| | Path to the final video file. |
| | """ |
| | run_dir = Path(run_dir) |
| | clips_dir = run_dir / "clips" |
| | output_dir = run_dir / "output" |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | with open(run_dir / "segments.json") as f: |
| | segments = json.load(f) |
| |
|
| | |
| | drop_time = None |
| | drop_path = run_dir / "drop.json" |
| | if drop_path.exists(): |
| | with open(drop_path) as f: |
| | drop_time = json.load(f).get("drop_time") |
| | print(f" Drop at {drop_time:.3f}s") |
| | else: |
| | print(" No drop detected — using uniform pacing") |
| |
|
| | if audio_path is None: |
| | audio_path = _get_audio_path(run_dir) |
| | audio_path = Path(audio_path) |
| |
|
| | |
| | sub_clips = [] |
| | for seg in segments: |
| | idx = seg["segment"] |
| | clip_path = clips_dir / f"clip_{idx:03d}.mp4" |
| | if not clip_path.exists(): |
| | print(f" Warning: {clip_path.name} not found, skipping") |
| | continue |
| |
|
| | halves = _split_clip(clip_path, idx) |
| | sub_clips.append((idx, "first", halves["first"], halves["first_duration"])) |
| | sub_clips.append((idx, "second", halves["second"], halves["second_duration"])) |
| | print(f" Registered {clip_path.name} ({halves['first_duration']:.1f}s)") |
| |
|
| | if not sub_clips: |
| | raise FileNotFoundError(f"No clips found in {clips_dir}") |
| |
|
| | |
| | sub_segments = _build_sub_segments(segments, drop_time) |
| | print(f" Timeline: {len(sub_segments)} slots " |
| | f"({len([s for s in sub_segments if s['duration'] < 1.5])} fast cuts)") |
| |
|
| | |
| | assigned = _shuffle_with_distance(sub_clips.copy(), n_slots=len(sub_segments)) |
| |
|
| | |
| | |
| | fps = _get_clip_fps(assigned[0][2]) |
| | print(f" Source FPS: {fps}") |
| |
|
| | trimmed_dir = run_dir / "clips_trimmed" |
| | trimmed_dir.mkdir(exist_ok=True) |
| | trimmed_paths = [] |
| |
|
| | |
| | clip_width, clip_height = _get_clip_dimensions(assigned[0][2]) |
| | print(f" Clip resolution: {clip_width}x{clip_height}") |
| |
|
| | |
| | cumulative_frames = 0 |
| | cumulative_target = 0.0 |
| |
|
| | for i, (sub_seg, (clip_id, half, clip_path, clip_dur)) in enumerate( |
| | zip(sub_segments, assigned) |
| | ): |
| | slot_dur = sub_seg["duration"] |
| | cumulative_target += min(slot_dur, clip_dur) |
| | target_frame = round(cumulative_target * fps) |
| | n_frames = max(1, target_frame - cumulative_frames) |
| | cumulative_frames = target_frame |
| |
|
| | |
| | |
| | if half == "second": |
| | ss = max(0, clip_dur - slot_dur) |
| | else: |
| | ss = 0 |
| |
|
| | |
| | effect = KEN_BURNS_EFFECTS[i % len(KEN_BURNS_EFFECTS)] |
| | vf = _ken_burns_filter(effect, n_frames, clip_width, clip_height) |
| |
|
| | trimmed_path = trimmed_dir / f"slot_{i:03d}.mp4" |
| | cmd = [ |
| | "ffmpeg", "-y", |
| | "-ss", f"{ss:.3f}", |
| | "-i", str(clip_path), |
| | "-frames:v", str(n_frames), |
| | "-vf", vf, |
| | "-c:v", "libx264", "-preset", "fast", |
| | "-r", str(int(fps)), |
| | "-an", |
| | str(trimmed_path), |
| | ] |
| | subprocess.run(cmd, check=True, capture_output=True) |
| | trimmed_paths.append(trimmed_path) |
| | actual_dur = n_frames / fps |
| | print(f" Slot {i}: clip {clip_id} ({half}, ss={ss:.1f}s, {effect}) → " |
| | f"{n_frames}f/{actual_dur:.3f}s (target {slot_dur:.3f}s)") |
| |
|
| | |
| | with tempfile.NamedTemporaryFile( |
| | mode="w", suffix=".txt", delete=False, dir=str(run_dir) |
| | ) as f: |
| | for p in trimmed_paths: |
| | f.write(f"file '{p.resolve()}'\n") |
| | concat_list = f.name |
| |
|
| | concat_path = output_dir / "video_only.mp4" |
| | subprocess.run([ |
| | "ffmpeg", "-y", |
| | "-f", "concat", "-safe", "0", |
| | "-i", concat_list, |
| | "-c", "copy", |
| | str(concat_path), |
| | ], check=True, capture_output=True) |
| |
|
| | |
| | audio_start = segments[0]["start"] |
| | video_duration = cumulative_frames / fps |
| |
|
| | with_audio_path = output_dir / "with_audio.mp4" |
| | subprocess.run([ |
| | "ffmpeg", "-y", |
| | "-i", str(concat_path), |
| | "-ss", f"{audio_start:.3f}", |
| | "-i", str(audio_path), |
| | "-t", f"{video_duration:.3f}", |
| | "-c:v", "copy", |
| | "-c:a", "aac", "-b:a", "192k", |
| | "-map", "0:v:0", "-map", "1:a:0", |
| | "-shortest", |
| | str(with_audio_path), |
| | ], check=True, capture_output=True) |
| |
|
| | |
| | overlay_path = output_dir / "with_overlay.mp4" |
| | cover_path = Path(cover_art) if cover_art else None |
| | song_name = run_dir.parent.name |
| | _add_lyrics_overlay(with_audio_path, segments, overlay_path, audio_start, |
| | font_name=font_name, font_color=font_color, |
| | cover_art=cover_path, drop_time=drop_time, |
| | song_name=song_name) |
| |
|
| | |
| | final_path = output_dir / "final.mp4" |
| | subprocess.run([ |
| | "ffmpeg", "-y", |
| | "-i", str(overlay_path), |
| | "-vf", "crop=2*floor(ih*9/16/2):ih:(iw-2*floor(ih*9/16/2))/2:0", |
| | "-c:v", "libx264", "-preset", "fast", |
| | "-c:a", "copy", |
| | str(final_path), |
| | ], check=True, capture_output=True) |
| |
|
| | |
| | Path(concat_list).unlink(missing_ok=True) |
| |
|
| | print(f"\nFinal video: {final_path}") |
| | print(f" Duration: {video_duration:.2f}s") |
| | print(f" Slots: {len(sub_segments)} ({len(segments)} original segments)") |
| | return final_path |
| |
|
| |
|
| | def run( |
| | run_dir: str | Path, |
| | font_name: str = DEFAULT_FONT, |
| | font_color: str = DEFAULT_FONT_COLOR, |
| | cover_art: str | Path | None = None, |
| | ) -> Path: |
| | """Assemble final video from clips + audio. |
| | |
| | Args: |
| | run_dir: Run directory (e.g. data/Gone/run_001/). |
| | font_name: Display name of the font for lyrics overlay. |
| | font_color: Hex color for lyrics text. |
| | cover_art: Path to cover art image (optional). |
| | |
| | Returns: |
| | Path to final video. |
| | """ |
| | print("Assembling final video...") |
| | return assemble(run_dir, font_name=font_name, font_color=font_color, |
| | cover_art=cover_art) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import sys |
| |
|
| | if len(sys.argv) < 2: |
| | print("Usage: python -m src.assembler <run_dir>") |
| | print(" e.g. python -m src.assembler data/Gone/run_001") |
| | sys.exit(1) |
| |
|
| | run(sys.argv[1]) |
| |
|