"""SyncAI — AI Music Video Generator. Gradio app that orchestrates the full pipeline: Song → Stems → Lyrics + Beats → Segments → Prompts → Images → Video → Assembly Works locally (fal.ai API for video) and on HuggingFace Spaces (on-device Wan 2.1). """ import json import os import shutil from pathlib import Path from dotenv import load_dotenv load_dotenv() import gradio as gr import torch # Lightweight imports only — heavy modules (whisperx, madmom, etc.) # are lazy-imported inside generate() to keep the UI responsive. from src.assembler import font_names, DEFAULT_FONT, DEFAULT_FONT_COLOR from src.styles import style_names, get_style # --------------------------------------------------------------------------- # Environment detection # --------------------------------------------------------------------------- IS_SPACES = os.getenv("SPACE_ID") is not None if IS_SPACES: import spaces INPUT_DIR = Path("input") INPUT_DIR.mkdir(exist_ok=True) # --------------------------------------------------------------------------- # GPU-accelerated steps (decorated only on Spaces) # --------------------------------------------------------------------------- def _gpu_generate_images(run_dir, style_name): """GPU phase: generate all images.""" print(f"[GPU] Generating images (run_dir={run_dir}, style={style_name})") if IS_SPACES: from src.image_generator_hf import run as gen_images else: from src.image_generator_api import run as gen_images gen_images(run_dir, style_name=style_name) torch.cuda.empty_cache() print("[GPU] Image generation complete.") def _gpu_generate_one_video(run_dir, segment_idx, prompt, negative_prompt, seed): """GPU phase: generate a single video clip. Each call gets a fresh ZeroGPU token.""" import time run_dir = Path(run_dir) image_path = run_dir / "images" / f"segment_{segment_idx:03d}.png" clip_path = run_dir / "clips" / f"clip_{segment_idx:03d}.mp4" clip_path.parent.mkdir(parents=True, exist_ok=True) if clip_path.exists(): print(f" [GPU] Clip {segment_idx}: already exists, skipping") return if not image_path.exists(): print(f" [GPU] Clip {segment_idx}: image not found, skipping") return if IS_SPACES: from src.video_generator_hf import generate_clip else: from src.video_generator_api import generate_clip print(f" [GPU] Generating clip {segment_idx}...") t0 = time.time() generate_clip(image_path, prompt, clip_path, negative_prompt, seed=seed) print(f" [GPU] Clip {segment_idx} done ({time.time() - t0:.1f}s)") # Apply @spaces.GPU decorator on Spaces — each gets a fresh token if IS_SPACES: _gpu_generate_images = spaces.GPU(duration=300)(_gpu_generate_images) _gpu_generate_one_video = spaces.GPU(duration=600)(_gpu_generate_one_video) # --------------------------------------------------------------------------- # Run discovery & step detection # --------------------------------------------------------------------------- DATA_DIR = Path("data") STEPS = [ "1. Stems", "2. Lyrics", "3. Beats", "4. Segmentation", "5. Prompts", "6. Images", "7. Videos", "8. Assembly", ] def _list_runs() -> list[str]: """Find all existing run directories under data/.""" if not DATA_DIR.exists(): return [] runs = [] for song_dir in sorted(DATA_DIR.iterdir()): if not song_dir.is_dir(): continue for run_dir in sorted(song_dir.glob("run_*")): if run_dir.is_dir(): runs.append(f"{song_dir.name}/{run_dir.name}") return runs def _detect_completed_steps(run_dir: Path) -> int: """Return the number of the last fully completed step (0 = nothing done).""" # Step 1: vocals + drums stems exist (LALAL.AI only extracts these two) stems = run_dir / "stems" for name in ["drums.wav", "vocals.wav"]: if not (stems / name).exists(): return 0 # Step 2: lyrics.json valid with at least 1 entry lyrics_path = run_dir / "lyrics.json" if not lyrics_path.exists(): return 1 try: data = json.loads(lyrics_path.read_text()) if not isinstance(data, list) or len(data) == 0: return 1 except (json.JSONDecodeError, OSError): return 1 # Step 3: beats.json valid with at least 1 entry beats_path = run_dir / "beats.json" if not beats_path.exists(): return 2 try: data = json.loads(beats_path.read_text()) if not isinstance(data, list) or len(data) == 0: return 2 except (json.JSONDecodeError, OSError): return 2 # Step 4: segments.json valid with at least 1 segment having start/end seg_path = run_dir / "segments.json" if not seg_path.exists(): return 3 try: segments = json.loads(seg_path.read_text()) if not isinstance(segments, list) or len(segments) == 0: return 3 if "start" not in segments[0] or "end" not in segments[0]: return 3 except (json.JSONDecodeError, OSError): return 3 # Step 5: every segment has a non-empty "prompt" key try: if not all(seg.get("prompt") for seg in segments): return 4 except Exception: return 4 n_segments = len(segments) # Step 6: exactly N image files exist for i in range(1, n_segments + 1): if not (run_dir / "images" / f"segment_{i:03d}.png").exists(): return 5 # Step 7: exactly N clip files exist for i in range(1, n_segments + 1): if not (run_dir / "clips" / f"clip_{i:03d}.mp4").exists(): return 6 # Step 8: final.mp4 exists with size > 0 final = run_dir / "output" / "final.mp4" if not final.exists() or final.stat().st_size == 0: return 7 return 8 def _get_startable_steps(run_dir: Path) -> list[str]: """Return step names the user can start from (all prerequisites met).""" completed = _detect_completed_steps(run_dir) # Can start from any step up to completed+1 (the next incomplete step) last_startable = min(completed + 1, 8) return STEPS[:last_startable] # steps 1 through last_startable def _on_run_mode_change(run_mode): """Toggle visibility of audio upload vs resume controls.""" is_resume = run_mode == "Resume Existing" return ( gr.update(visible=not is_resume), # audio_input gr.update(visible=is_resume, choices=_list_runs()), # existing_run gr.update(visible=is_resume, choices=[], value=None), # start_step gr.update(visible=is_resume), # reuse_files ) def _on_run_selected(existing_run): """Update step dropdown when a run is selected.""" if not existing_run: return gr.update(choices=[], value=None) run_dir = DATA_DIR / existing_run steps = _get_startable_steps(run_dir) default = steps[-1] if steps else None return gr.update(choices=steps, value=default) # --------------------------------------------------------------------------- # Main pipeline # --------------------------------------------------------------------------- _COLOR_PRESETS = { "Warm White": "#FFF7D4", "White": "#FFFFFF", "Red": "#FF3B30", "Cyan": "#00E5FF", "Gold": "#FFD700", "Custom": None, } def generate_cpu(audio_file: str, style_name: str, cover_art: str | None, run_mode: str, existing_run: str | None, start_step: str | None, reuse_files: bool, progress=gr.Progress()): """CPU phase: steps 1-5 (stems, lyrics, beats, segmentation, prompts). Returns state dict for the GPU phases. """ style = get_style(style_name) is_resume = run_mode == "Resume Existing" if is_resume: if not existing_run: raise gr.Error("Please select an existing run.") if not start_step: raise gr.Error("Please select a step to start from.") run_dir = DATA_DIR / existing_run if not run_dir.exists(): raise gr.Error(f"Run directory not found: {run_dir}") step_num = int(start_step.split(".")[0]) print(f"Resuming {existing_run} from step {step_num}") # Always clear assembly output (cheap to redo) out_dir = run_dir / "output" if out_dir.exists(): shutil.rmtree(out_dir) for d in ["clips_split", "clips_trimmed"]: p = run_dir / d if p.exists(): shutil.rmtree(p) if not reuse_files: if step_num <= 6: img_dir = run_dir / "images" if img_dir.exists(): shutil.rmtree(img_dir) if step_num <= 7: clips_dir = run_dir / "clips" if clips_dir.exists(): shutil.rmtree(clips_dir) else: if audio_file is None: raise gr.Error("Please upload a song first.") step_num = 1 import gc def _flush_memory(): gc.collect() if hasattr(torch, "mps") and torch.backends.mps.is_available(): torch.mps.empty_cache() if torch.cuda.is_available(): torch.cuda.empty_cache() # --- Step 1: Stem Separation --- if step_num <= 1: progress(0.0, desc="Separating stems...") from src.stem_separator import separate_stems if is_resume: song_dir = run_dir.parent audio_candidates = list(song_dir.glob("*.wav")) + list(song_dir.glob("*.mp3")) + \ list(song_dir.glob("*.flac")) + list(song_dir.glob("*.m4a")) if not audio_candidates: raise gr.Error(f"No audio file found in {song_dir}") result = separate_stems(audio_candidates[0], output_dir=run_dir / "stems") else: result = separate_stems(Path(audio_file)) run_dir = result["run_dir"] print(f"Run directory: {run_dir}") # --- Step 2: Lyrics Extraction --- if step_num <= 2: progress(0.15, desc="Extracting lyrics...") from src.lyrics_extractor import extract_lyrics vocals_path = run_dir / "stems" / "vocals.wav" extract_lyrics(vocals_path) del extract_lyrics _flush_memory() # --- Step 3: Beat Detection --- if step_num <= 3: progress(0.25, desc="Detecting beats...") from src.beat_detector import run as detect_beats drums_path = run_dir / "stems" / "drums.wav" detect_beats(drums_path) del detect_beats _flush_memory() # --- Step 4: Segmentation --- if step_num <= 4: progress(0.35, desc="Segmenting lyrics to beats...") from src.segmenter import run as segment_lyrics segment_lyrics(run_dir) # --- Step 5: Prompt Generation --- if step_num <= 5: progress(0.40, desc="Generating prompts...") from src.prompt_generator import run as generate_prompts generate_prompts(run_dir, style_description=style["description"], image_prompt_guidance=style.get("image_prompt_guidance", ""), quality_suffix=style.get("quality_suffix", "")) print("Prompt generation complete.") progress(0.45, desc="CPU steps done, requesting GPU...") # Return state for GPU phases (all values must be picklable strings) # gr.update() for video_output keeps it unchanged but gives progress bar a visible target return gr.update(), str(run_dir), style_name, str(step_num), cover_art or "" def generate_images(video_out, run_dir_str: str, style_name: str, step_num_str: str, cover_art: str, progress=gr.Progress()): """GPU phase: step 6 — generate images. Gets a fresh ZeroGPU token.""" step_num = int(step_num_str) if step_num <= 7: progress(0.50, desc="Generating images...") _gpu_generate_images(run_dir_str, style_name) return gr.update(), run_dir_str, style_name, step_num_str, cover_art def generate_videos(video_out, run_dir_str: str, style_name: str, step_num_str: str, cover_art: str, progress=gr.Progress()): """GPU phase: step 7 — generate video clips, one per GPU session.""" step_num = int(step_num_str) if step_num <= 7: run_dir = Path(run_dir_str) with open(run_dir / "segments.json") as f: segments = json.load(f) seed = 42 for i, seg in enumerate(segments): idx = seg["segment"] prompt = seg.get("video_prompt", seg.get("scene", seg.get("prompt", ""))) neg = seg.get("negative_prompt", "") progress(0.50 + 0.35 * (i / len(segments)), desc=f"Generating video clip {i+1}/{len(segments)}...") _gpu_generate_one_video(run_dir_str, idx, prompt, neg, seed + idx) # Unload video model after all clips are done if IS_SPACES: try: from src.video_generator_hf import unload unload() except Exception: pass print(f"All {len(segments)} video clips generated.") return gr.update(), run_dir_str, cover_art def generate_assembly(run_dir_str: str, cover_art: str, progress=gr.Progress()): """CPU phase: step 8 — assemble final video.""" run_dir = Path(run_dir_str) cover = cover_art if cover_art else None progress(0.90, desc="Assembling final video...") from src.assembler import run as assemble_video final_path = assemble_video(run_dir, font_name=DEFAULT_FONT, font_color=DEFAULT_FONT_COLOR, cover_art=cover) progress(1.0, desc="Done!") return str(final_path), run_dir_str, gr.update(interactive=True) def reshuffle(run_dir_str: str, cover_art: str | None, progress=gr.Progress()): """Re-run only the assembly step with a new random shuffle.""" if not run_dir_str: raise gr.Error("No previous run to reshuffle. Generate a video first.") run_dir = Path(run_dir_str) if not run_dir.exists(): raise gr.Error(f"Run directory not found: {run_dir}") font_name = DEFAULT_FONT font_color = DEFAULT_FONT_COLOR # Clear assembly artifacts for d in ["clips_trimmed", "output"]: p = run_dir / d if p.exists(): shutil.rmtree(p) progress(0.2, desc="Reshuffling and assembling...") from src.assembler import run as assemble_video final_path = assemble_video(run_dir, font_name=font_name, font_color=font_color, cover_art=cover_art) progress(1.0, desc="Done!") return str(final_path) # --------------------------------------------------------------------------- # Gradio UI # --------------------------------------------------------------------------- _custom_css = """ /* Load Google Fonts for dropdown preview */ @import url('https://fonts.googleapis.com/css2?family=Bebas+Neue&family=Teko:wght@700&family=Russo+One&family=Staatliches&display=swap'); /* Style font dropdown options in their actual font */ #font-dropdown [data-value="Bebas Neue"], #font-dropdown li:nth-child(1) { font-family: 'Bebas Neue', sans-serif !important; } #font-dropdown [data-value="Teko"], #font-dropdown li:nth-child(2) { font-family: 'Teko', sans-serif !important; font-weight: 700 !important; } #font-dropdown [data-value="Russo One"], #font-dropdown li:nth-child(3) { font-family: 'Russo One', sans-serif !important; } #font-dropdown [data-value="Staatliches"], #font-dropdown li:nth-child(4) { font-family: 'Staatliches', sans-serif !important; } #font-dropdown ul li { font-size: 16px !important; } /* Remove white border on color picker */ input[type="color"], input[type="color"]:focus, input[type="color"]:hover, .gr-color-picker input, div[data-testid="color-picker"] input, div[data-testid="color-picker"] div, .color-picker input { border: none !important; outline: none !important; box-shadow: none !important; background: transparent !important; } /* Color swatch buttons */ .color-swatch { min-width: 36px !important; max-width: 36px !important; height: 36px !important; padding: 0 !important; border-radius: 6px !important; border: 2px solid transparent !important; cursor: pointer !important; box-shadow: none !important; transition: border-color 0.15s ease !important; } .color-swatch:hover { border-color: rgba(255,255,255,0.5) !important; } .color-swatch.selected { border-color: #fff !important; } #swatch-0 { background: #FFF7D4 !important; } #swatch-1 { background: #FFFFFF !important; } #swatch-2 { background: #FF3B30 !important; } #swatch-3 { background: #00E5FF !important; } #swatch-4 { background: #FFD700 !important; } #swatch-custom { background: conic-gradient(red, yellow, lime, aqua, blue, magenta, red); min-width: 36px !important; max-width: 36px !important; height: 36px !important; padding: 0 !important; border-radius: 50% !important; border: 2px solid transparent !important; cursor: pointer !important; box-shadow: none !important; } #swatch-custom:hover { border-color: rgba(255,255,255,0.5) !important; } #swatch-custom.selected { border-color: #fff !important; } /* Custom color picker — hide all labels/headers */ #custom-color-picker .label-wrap, #custom-color-picker label, #custom-color-picker .block-label, #custom-color-picker span.svelte-1gfkn6j, #custom-color-picker > span { display: none !important; } #custom-color-picker, #custom-color-picker fieldset, fieldset#custom-color-picker { min-height: 0 !important; padding: 0 !important; border: none !important; background: #272727 !important; display: flex !important; justify-content: center !important; } /* Force dark background on ALL descendants of the color picker */ #custom-color-picker *, #custom-color-picker div, #custom-color-picker fieldset, #custom-color-picker .block, #custom-color-picker .wrap { background-color: #272727 !important; border-color: #3a3a3a !important; } /* Hide the trigger swatch, keep popup functional */ #custom-color-picker .wrap { height: 0 !important; overflow: visible !important; } #custom-color-picker button { height: 0 !important; width: 0 !important; padding: 0 !important; border: none !important; overflow: visible !important; } /* Hide Hex/RGB/HSL mode switcher buttons */ button.svelte-nbn1m9 { display: none !important; } /* Force all group/panel backgrounds to match */ .gr-group, .gr-block, .gr-panel, .group, .panel, div[class*="group"], div[class*="panel"] { background: #272727 !important; } /* Color row layout — centered in box */ #color-row, #color-row.svelte-7xavid { gap: 6px !important; align-items: center !important; justify-content: center !important; padding: 10px 0 6px !important; background: #272727 !important; background-color: #272727 !important; } """ _dark_theme = gr.themes.Soft( primary_hue=gr.themes.Color( c50="#02C160", c100="rgba(2,193,96,0.2)", c200="#02C160", c300="rgba(2,193,96,0.32)", c400="rgba(2,193,96,0.32)", c500="rgba(2,193,96,1.0)", c600="rgba(2,193,96,1.0)", c700="rgba(2,193,96,0.32)", c800="rgba(2,193,96,0.32)", c900="#02C160", c950="#02C160", ), secondary_hue=gr.themes.Color( c50="#576b95", c100="#576b95", c200="#576b95", c300="#576b95", c400="#576b95", c500="#576b95", c600="#576b95", c700="#576b95", c800="#576b95", c900="#576b95", c950="#576b95", ), neutral_hue=gr.themes.Color( c50="#2a2a2a", c100="#313131", c200="#3a3a3a", c300="#4a4a4a", c400="#B2B2B2", c500="#808080", c600="#636363", c700="#515151", c800="#393939", c900="#272727", c950="#171717", ), font=[gr.themes.GoogleFont("Montserrat"), "ui-sans-serif", "system-ui", "sans-serif"], font_mono=[gr.themes.GoogleFont("IBM Plex Mono"), "ui-monospace", "Consolas", "monospace"], ).set( body_background_fill="#171717", body_background_fill_dark="#171717", body_text_color="#e0e0e0", body_text_color_dark="#e0e0e0", body_text_color_subdued="#808080", body_text_color_subdued_dark="#808080", block_background_fill="#272727", block_background_fill_dark="#272727", block_border_color="#3a3a3a", block_border_color_dark="#3a3a3a", block_border_width="0px", block_label_background_fill="rgba(2,193,96,0.2)", block_label_background_fill_dark="rgba(2,193,96,0.2)", block_label_text_color="rgba(2,193,96,1.0)", block_label_text_color_dark="rgba(2,193,96,1.0)", block_title_background_fill="rgba(2,193,96,0.2)", block_title_text_color="rgba(2,193,96,1.0)", block_title_text_color_dark="rgba(2,193,96,1.0)", input_background_fill="#313131", input_background_fill_dark="#313131", input_border_color="#3a3a3a", input_border_color_dark="#3a3a3a", input_border_width="0px", button_primary_background_fill="#06AE56", button_primary_background_fill_dark="#06AE56", button_primary_background_fill_hover="#07C863", button_primary_background_fill_hover_dark="#07C863", button_primary_border_color="#06AE56", button_primary_border_color_dark="#06AE56", button_primary_text_color="#FFFFFF", button_primary_text_color_dark="#FFFFFF", button_secondary_background_fill="#2B2B2B", button_secondary_background_fill_dark="#2B2B2B", button_secondary_text_color="#FFFFFF", button_secondary_text_color_dark="#FFFFFF", background_fill_primary="#171717", background_fill_primary_dark="#171717", background_fill_secondary="#272727", background_fill_secondary_dark="#272727", border_color_primary="#3a3a3a", border_color_primary_dark="#3a3a3a", panel_background_fill="#272727", panel_background_fill_dark="#272727", panel_border_color="#3a3a3a", panel_border_color_dark="#3a3a3a", shadow_drop="0 1px 4px 0 rgb(0 0 0 / 0.3)", shadow_drop_lg="0 2px 5px 0 rgb(0 0 0 / 0.3)", color_accent_soft="#272727", color_accent_soft_dark="#272727", ) with gr.Blocks( title="SyncAI", theme=_dark_theme, css=_custom_css, ) as demo: gr.Markdown("# SyncAI\n### AI Music Ads Generator") gr.Markdown( "Upload a song (~15s clip), pick a visual style, and generate " "a beat-synced music video ad." ) # --- Build example song/cover art maps --- _EXAMPLES_DIR = Path("examples") _COVER_ART_MAP = { "Gone": "Gone.jpg", "Cant find myself": "Cant find myself.png", "The more I do": "The more I do.png", "House of House": "House of House.png", } _example_songs = {} _example_covers = {} if _EXAMPLES_DIR.exists(): for wav in sorted(_EXAMPLES_DIR.glob("*.wav")): _example_songs[wav.stem] = str(wav) cover_file = _COVER_ART_MAP.get(wav.stem, "") cover_path = _EXAMPLES_DIR / cover_file if cover_path.exists(): _example_covers[wav.stem] = str(cover_path) def _on_example_song(song_name, cover_mode): if not song_name: return None, None audio = _example_songs.get(song_name) cover = _example_covers.get(song_name) if cover_mode == "With cover art" else None return audio, cover with gr.Row(equal_height=True): # --- Left: Song --- with gr.Column(): audio_input = gr.Audio( label="Upload Song", type="filepath", sources=["upload"], ) with gr.Group(): example_song = gr.Dropdown( choices=list(_example_songs.keys()) if _example_songs else [], value=None, label="Or pick an example", info="Pre-loaded ~15s song clips to try the pipeline", ) example_cover_mode = gr.Radio( choices=["With cover art", "Without cover art"], value="With cover art", show_label=False, info="Include album artwork overlay from the drop onwards", ) # --- Center: Cover art --- with gr.Column(): cover_art_input = gr.Image( label="Cover Art (optional)", type="filepath", sources=["upload"], ) # --- Right: Visual Style --- with gr.Column(): style_dropdown = gr.Dropdown( choices=style_names(), value="Sunset Coastal Drive", label="Visual Style", info="LoRA style applied to generated images", ) # --- Resume (dev only, below main row) --- with gr.Row(visible=not IS_SPACES): with gr.Column(): with gr.Group(): run_mode = gr.Radio( choices=["New Run", "Resume Existing"], value="New Run", label="Run Mode", ) existing_run = gr.Dropdown( choices=_list_runs(), label="Existing Run", visible=False, ) start_step = gr.Dropdown( choices=[], label="Start From Step", visible=False, ) reuse_files = gr.Checkbox( value=True, label="Reuse existing images & videos", info="Uncheck to regenerate images and video clips", visible=False, ) generate_btn = gr.Button("Generate Video", variant="primary") video_output = gr.Video(label="Generated Music Video") reshuffle_btn = gr.Button("Reshuffle", variant="secondary", visible=True, interactive=False) last_run_dir = gr.State(value="") # Hidden state for passing data between chained pipeline phases _st_run_dir = gr.State(value="") _st_style = gr.State(value="") _st_step = gr.State(value="1") _st_cover = gr.State(value="") # --- Event handlers --- example_song.change( fn=_on_example_song, inputs=[example_song, example_cover_mode], outputs=[audio_input, cover_art_input], ) example_cover_mode.change( fn=_on_example_song, inputs=[example_song, example_cover_mode], outputs=[audio_input, cover_art_input], ) run_mode.change( fn=_on_run_mode_change, inputs=run_mode, outputs=[audio_input, existing_run, start_step, reuse_files], ) existing_run.change( fn=_on_run_selected, inputs=existing_run, outputs=start_step, ) generate_btn.click( fn=generate_cpu, inputs=[audio_input, style_dropdown, cover_art_input, run_mode, existing_run, start_step, reuse_files], outputs=[video_output, _st_run_dir, _st_style, _st_step, _st_cover], ).then( fn=generate_images, inputs=[video_output, _st_run_dir, _st_style, _st_step, _st_cover], outputs=[video_output, _st_run_dir, _st_style, _st_step, _st_cover], ).then( fn=generate_videos, inputs=[video_output, _st_run_dir, _st_style, _st_step, _st_cover], outputs=[video_output, _st_run_dir, _st_cover], ).then( fn=generate_assembly, inputs=[_st_run_dir, _st_cover], outputs=[video_output, last_run_dir, reshuffle_btn], ) reshuffle_btn.click( fn=reshuffle, inputs=[last_run_dir, cover_art_input], outputs=video_output, ) if __name__ == "__main__": demo.launch()