| | """SyncAI β AI Music Video Generator. |
| | |
| | Gradio app that orchestrates the full pipeline: |
| | Song β Stems β Lyrics + Beats β Segments β Prompts β Images β Video β Assembly |
| | |
| | Works locally (fal.ai API for video) and on HuggingFace Spaces (on-device Wan 2.1). |
| | """ |
| |
|
| | import json |
| | import os |
| | import shutil |
| | from pathlib import Path |
| |
|
| | from dotenv import load_dotenv |
| | load_dotenv() |
| |
|
| | import gradio as gr |
| | import torch |
| |
|
| | |
| | |
| | from src.assembler import font_names, DEFAULT_FONT, DEFAULT_FONT_COLOR |
| | from src.styles import style_names, get_style |
| |
|
| | |
| | |
| | |
| |
|
| | IS_SPACES = os.getenv("SPACE_ID") is not None |
| |
|
| | if IS_SPACES: |
| | import spaces |
| |
|
| | INPUT_DIR = Path("input") |
| | INPUT_DIR.mkdir(exist_ok=True) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _gpu_generate_images(run_dir, style_name): |
| | """GPU phase: generate all images.""" |
| | print(f"[GPU] Generating images (run_dir={run_dir}, style={style_name})") |
| | if IS_SPACES: |
| | from src.image_generator_hf import run as gen_images |
| | else: |
| | from src.image_generator_api import run as gen_images |
| | gen_images(run_dir, style_name=style_name) |
| | torch.cuda.empty_cache() |
| | print("[GPU] Image generation complete.") |
| |
|
| |
|
| | def _gpu_generate_one_video(run_dir, segment_idx, prompt, negative_prompt, seed): |
| | """GPU phase: generate a single video clip. Each call gets a fresh ZeroGPU token.""" |
| | import time |
| | run_dir = Path(run_dir) |
| | image_path = run_dir / "images" / f"segment_{segment_idx:03d}.png" |
| | clip_path = run_dir / "clips" / f"clip_{segment_idx:03d}.mp4" |
| | clip_path.parent.mkdir(parents=True, exist_ok=True) |
| |
|
| | if clip_path.exists(): |
| | print(f" [GPU] Clip {segment_idx}: already exists, skipping") |
| | return |
| |
|
| | if not image_path.exists(): |
| | print(f" [GPU] Clip {segment_idx}: image not found, skipping") |
| | return |
| |
|
| | if IS_SPACES: |
| | from src.video_generator_hf import generate_clip |
| | else: |
| | from src.video_generator_api import generate_clip |
| |
|
| | print(f" [GPU] Generating clip {segment_idx}...") |
| | t0 = time.time() |
| | generate_clip(image_path, prompt, clip_path, negative_prompt, seed=seed) |
| | print(f" [GPU] Clip {segment_idx} done ({time.time() - t0:.1f}s)") |
| |
|
| |
|
| | |
| | if IS_SPACES: |
| | _gpu_generate_images = spaces.GPU(duration=300)(_gpu_generate_images) |
| | _gpu_generate_one_video = spaces.GPU(duration=600)(_gpu_generate_one_video) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | DATA_DIR = Path("data") |
| |
|
| | STEPS = [ |
| | "1. Stems", |
| | "2. Lyrics", |
| | "3. Beats", |
| | "4. Segmentation", |
| | "5. Prompts", |
| | "6. Images", |
| | "7. Videos", |
| | "8. Assembly", |
| | ] |
| |
|
| |
|
| | def _list_runs() -> list[str]: |
| | """Find all existing run directories under data/.""" |
| | if not DATA_DIR.exists(): |
| | return [] |
| | runs = [] |
| | for song_dir in sorted(DATA_DIR.iterdir()): |
| | if not song_dir.is_dir(): |
| | continue |
| | for run_dir in sorted(song_dir.glob("run_*")): |
| | if run_dir.is_dir(): |
| | runs.append(f"{song_dir.name}/{run_dir.name}") |
| | return runs |
| |
|
| |
|
| | def _detect_completed_steps(run_dir: Path) -> int: |
| | """Return the number of the last fully completed step (0 = nothing done).""" |
| | |
| | stems = run_dir / "stems" |
| | for name in ["drums.wav", "vocals.wav"]: |
| | if not (stems / name).exists(): |
| | return 0 |
| |
|
| | |
| | lyrics_path = run_dir / "lyrics.json" |
| | if not lyrics_path.exists(): |
| | return 1 |
| | try: |
| | data = json.loads(lyrics_path.read_text()) |
| | if not isinstance(data, list) or len(data) == 0: |
| | return 1 |
| | except (json.JSONDecodeError, OSError): |
| | return 1 |
| |
|
| | |
| | beats_path = run_dir / "beats.json" |
| | if not beats_path.exists(): |
| | return 2 |
| | try: |
| | data = json.loads(beats_path.read_text()) |
| | if not isinstance(data, list) or len(data) == 0: |
| | return 2 |
| | except (json.JSONDecodeError, OSError): |
| | return 2 |
| |
|
| | |
| | seg_path = run_dir / "segments.json" |
| | if not seg_path.exists(): |
| | return 3 |
| | try: |
| | segments = json.loads(seg_path.read_text()) |
| | if not isinstance(segments, list) or len(segments) == 0: |
| | return 3 |
| | if "start" not in segments[0] or "end" not in segments[0]: |
| | return 3 |
| | except (json.JSONDecodeError, OSError): |
| | return 3 |
| |
|
| | |
| | try: |
| | if not all(seg.get("prompt") for seg in segments): |
| | return 4 |
| | except Exception: |
| | return 4 |
| |
|
| | n_segments = len(segments) |
| |
|
| | |
| | for i in range(1, n_segments + 1): |
| | if not (run_dir / "images" / f"segment_{i:03d}.png").exists(): |
| | return 5 |
| |
|
| | |
| | for i in range(1, n_segments + 1): |
| | if not (run_dir / "clips" / f"clip_{i:03d}.mp4").exists(): |
| | return 6 |
| |
|
| | |
| | final = run_dir / "output" / "final.mp4" |
| | if not final.exists() or final.stat().st_size == 0: |
| | return 7 |
| |
|
| | return 8 |
| |
|
| |
|
| | def _get_startable_steps(run_dir: Path) -> list[str]: |
| | """Return step names the user can start from (all prerequisites met).""" |
| | completed = _detect_completed_steps(run_dir) |
| | |
| | last_startable = min(completed + 1, 8) |
| | return STEPS[:last_startable] |
| |
|
| |
|
| | def _on_run_mode_change(run_mode): |
| | """Toggle visibility of audio upload vs resume controls.""" |
| | is_resume = run_mode == "Resume Existing" |
| | return ( |
| | gr.update(visible=not is_resume), |
| | gr.update(visible=is_resume, choices=_list_runs()), |
| | gr.update(visible=is_resume, choices=[], value=None), |
| | gr.update(visible=is_resume), |
| | ) |
| |
|
| |
|
| | def _on_run_selected(existing_run): |
| | """Update step dropdown when a run is selected.""" |
| | if not existing_run: |
| | return gr.update(choices=[], value=None) |
| | run_dir = DATA_DIR / existing_run |
| | steps = _get_startable_steps(run_dir) |
| | default = steps[-1] if steps else None |
| | return gr.update(choices=steps, value=default) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | _COLOR_PRESETS = { |
| | "Warm White": "#FFF7D4", |
| | "White": "#FFFFFF", |
| | "Red": "#FF3B30", |
| | "Cyan": "#00E5FF", |
| | "Gold": "#FFD700", |
| | "Custom": None, |
| | } |
| |
|
| |
|
| | def generate_cpu(audio_file: str, style_name: str, cover_art: str | None, |
| | run_mode: str, existing_run: str | None, start_step: str | None, |
| | reuse_files: bool, progress=gr.Progress()): |
| | """CPU phase: steps 1-5 (stems, lyrics, beats, segmentation, prompts). |
| | |
| | Returns state dict for the GPU phases. |
| | """ |
| | style = get_style(style_name) |
| | is_resume = run_mode == "Resume Existing" |
| |
|
| | if is_resume: |
| | if not existing_run: |
| | raise gr.Error("Please select an existing run.") |
| | if not start_step: |
| | raise gr.Error("Please select a step to start from.") |
| | run_dir = DATA_DIR / existing_run |
| | if not run_dir.exists(): |
| | raise gr.Error(f"Run directory not found: {run_dir}") |
| | step_num = int(start_step.split(".")[0]) |
| | print(f"Resuming {existing_run} from step {step_num}") |
| |
|
| | |
| | out_dir = run_dir / "output" |
| | if out_dir.exists(): |
| | shutil.rmtree(out_dir) |
| | for d in ["clips_split", "clips_trimmed"]: |
| | p = run_dir / d |
| | if p.exists(): |
| | shutil.rmtree(p) |
| |
|
| | if not reuse_files: |
| | if step_num <= 6: |
| | img_dir = run_dir / "images" |
| | if img_dir.exists(): |
| | shutil.rmtree(img_dir) |
| | if step_num <= 7: |
| | clips_dir = run_dir / "clips" |
| | if clips_dir.exists(): |
| | shutil.rmtree(clips_dir) |
| | else: |
| | if audio_file is None: |
| | raise gr.Error("Please upload a song first.") |
| | step_num = 1 |
| |
|
| | import gc |
| |
|
| | def _flush_memory(): |
| | gc.collect() |
| | if hasattr(torch, "mps") and torch.backends.mps.is_available(): |
| | torch.mps.empty_cache() |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| |
|
| | |
| | if step_num <= 1: |
| | progress(0.0, desc="Separating stems...") |
| | from src.stem_separator import separate_stems |
| | if is_resume: |
| | song_dir = run_dir.parent |
| | audio_candidates = list(song_dir.glob("*.wav")) + list(song_dir.glob("*.mp3")) + \ |
| | list(song_dir.glob("*.flac")) + list(song_dir.glob("*.m4a")) |
| | if not audio_candidates: |
| | raise gr.Error(f"No audio file found in {song_dir}") |
| | result = separate_stems(audio_candidates[0], output_dir=run_dir / "stems") |
| | else: |
| | result = separate_stems(Path(audio_file)) |
| | run_dir = result["run_dir"] |
| | print(f"Run directory: {run_dir}") |
| |
|
| | |
| | if step_num <= 2: |
| | progress(0.15, desc="Extracting lyrics...") |
| | from src.lyrics_extractor import extract_lyrics |
| | vocals_path = run_dir / "stems" / "vocals.wav" |
| | extract_lyrics(vocals_path) |
| | del extract_lyrics |
| | _flush_memory() |
| |
|
| | |
| | if step_num <= 3: |
| | progress(0.25, desc="Detecting beats...") |
| | from src.beat_detector import run as detect_beats |
| | drums_path = run_dir / "stems" / "drums.wav" |
| | detect_beats(drums_path) |
| | del detect_beats |
| | _flush_memory() |
| |
|
| | |
| | if step_num <= 4: |
| | progress(0.35, desc="Segmenting lyrics to beats...") |
| | from src.segmenter import run as segment_lyrics |
| | segment_lyrics(run_dir) |
| |
|
| | |
| | if step_num <= 5: |
| | progress(0.40, desc="Generating prompts...") |
| | from src.prompt_generator import run as generate_prompts |
| | generate_prompts(run_dir, style_description=style["description"], |
| | image_prompt_guidance=style.get("image_prompt_guidance", ""), |
| | quality_suffix=style.get("quality_suffix", "")) |
| | print("Prompt generation complete.") |
| |
|
| | progress(0.45, desc="CPU steps done, requesting GPU...") |
| | |
| | |
| | return gr.update(), str(run_dir), style_name, str(step_num), cover_art or "" |
| |
|
| |
|
| | def generate_images(video_out, run_dir_str: str, style_name: str, step_num_str: str, |
| | cover_art: str, progress=gr.Progress()): |
| | """GPU phase: step 6 β generate images. Gets a fresh ZeroGPU token.""" |
| | step_num = int(step_num_str) |
| | if step_num <= 7: |
| | progress(0.50, desc="Generating images...") |
| | _gpu_generate_images(run_dir_str, style_name) |
| | return gr.update(), run_dir_str, style_name, step_num_str, cover_art |
| |
|
| |
|
| | def generate_videos(video_out, run_dir_str: str, style_name: str, step_num_str: str, |
| | cover_art: str, progress=gr.Progress()): |
| | """GPU phase: step 7 β generate video clips, one per GPU session.""" |
| | step_num = int(step_num_str) |
| | if step_num <= 7: |
| | run_dir = Path(run_dir_str) |
| | with open(run_dir / "segments.json") as f: |
| | segments = json.load(f) |
| |
|
| | seed = 42 |
| | for i, seg in enumerate(segments): |
| | idx = seg["segment"] |
| | prompt = seg.get("video_prompt", seg.get("scene", seg.get("prompt", ""))) |
| | neg = seg.get("negative_prompt", "") |
| | progress(0.50 + 0.35 * (i / len(segments)), |
| | desc=f"Generating video clip {i+1}/{len(segments)}...") |
| | _gpu_generate_one_video(run_dir_str, idx, prompt, neg, seed + idx) |
| |
|
| | |
| | if IS_SPACES: |
| | try: |
| | from src.video_generator_hf import unload |
| | unload() |
| | except Exception: |
| | pass |
| |
|
| | print(f"All {len(segments)} video clips generated.") |
| | return gr.update(), run_dir_str, cover_art |
| |
|
| |
|
| | def generate_assembly(run_dir_str: str, cover_art: str, progress=gr.Progress()): |
| | """CPU phase: step 8 β assemble final video.""" |
| | run_dir = Path(run_dir_str) |
| | cover = cover_art if cover_art else None |
| |
|
| | progress(0.90, desc="Assembling final video...") |
| | from src.assembler import run as assemble_video |
| | final_path = assemble_video(run_dir, font_name=DEFAULT_FONT, font_color=DEFAULT_FONT_COLOR, |
| | cover_art=cover) |
| |
|
| | progress(1.0, desc="Done!") |
| | return str(final_path), run_dir_str, gr.update(interactive=True) |
| |
|
| |
|
| | def reshuffle(run_dir_str: str, cover_art: str | None, progress=gr.Progress()): |
| | """Re-run only the assembly step with a new random shuffle.""" |
| | if not run_dir_str: |
| | raise gr.Error("No previous run to reshuffle. Generate a video first.") |
| |
|
| | run_dir = Path(run_dir_str) |
| | if not run_dir.exists(): |
| | raise gr.Error(f"Run directory not found: {run_dir}") |
| |
|
| | font_name = DEFAULT_FONT |
| | font_color = DEFAULT_FONT_COLOR |
| |
|
| | |
| | for d in ["clips_trimmed", "output"]: |
| | p = run_dir / d |
| | if p.exists(): |
| | shutil.rmtree(p) |
| |
|
| | progress(0.2, desc="Reshuffling and assembling...") |
| | from src.assembler import run as assemble_video |
| | final_path = assemble_video(run_dir, font_name=font_name, font_color=font_color, |
| | cover_art=cover_art) |
| |
|
| | progress(1.0, desc="Done!") |
| | return str(final_path) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | _custom_css = """ |
| | /* Load Google Fonts for dropdown preview */ |
| | @import url('https://fonts.googleapis.com/css2?family=Bebas+Neue&family=Teko:wght@700&family=Russo+One&family=Staatliches&display=swap'); |
| | /* Style font dropdown options in their actual font */ |
| | #font-dropdown [data-value="Bebas Neue"], #font-dropdown li:nth-child(1) { font-family: 'Bebas Neue', sans-serif !important; } |
| | #font-dropdown [data-value="Teko"], #font-dropdown li:nth-child(2) { font-family: 'Teko', sans-serif !important; font-weight: 700 !important; } |
| | #font-dropdown [data-value="Russo One"], #font-dropdown li:nth-child(3) { font-family: 'Russo One', sans-serif !important; } |
| | #font-dropdown [data-value="Staatliches"], #font-dropdown li:nth-child(4) { font-family: 'Staatliches', sans-serif !important; } |
| | #font-dropdown ul li { font-size: 16px !important; } |
| | /* Remove white border on color picker */ |
| | input[type="color"], |
| | input[type="color"]:focus, |
| | input[type="color"]:hover, |
| | .gr-color-picker input, |
| | div[data-testid="color-picker"] input, |
| | div[data-testid="color-picker"] div, |
| | .color-picker input { |
| | border: none !important; |
| | outline: none !important; |
| | box-shadow: none !important; |
| | background: transparent !important; |
| | } |
| | /* Color swatch buttons */ |
| | .color-swatch { |
| | min-width: 36px !important; |
| | max-width: 36px !important; |
| | height: 36px !important; |
| | padding: 0 !important; |
| | border-radius: 6px !important; |
| | border: 2px solid transparent !important; |
| | cursor: pointer !important; |
| | box-shadow: none !important; |
| | transition: border-color 0.15s ease !important; |
| | } |
| | .color-swatch:hover { |
| | border-color: rgba(255,255,255,0.5) !important; |
| | } |
| | .color-swatch.selected { |
| | border-color: #fff !important; |
| | } |
| | #swatch-0 { background: #FFF7D4 !important; } |
| | #swatch-1 { background: #FFFFFF !important; } |
| | #swatch-2 { background: #FF3B30 !important; } |
| | #swatch-3 { background: #00E5FF !important; } |
| | #swatch-4 { background: #FFD700 !important; } |
| | #swatch-custom { |
| | background: conic-gradient(red, yellow, lime, aqua, blue, magenta, red); |
| | min-width: 36px !important; |
| | max-width: 36px !important; |
| | height: 36px !important; |
| | padding: 0 !important; |
| | border-radius: 50% !important; |
| | border: 2px solid transparent !important; |
| | cursor: pointer !important; |
| | box-shadow: none !important; |
| | } |
| | #swatch-custom:hover { |
| | border-color: rgba(255,255,255,0.5) !important; |
| | } |
| | #swatch-custom.selected { |
| | border-color: #fff !important; |
| | } |
| | /* Custom color picker β hide all labels/headers */ |
| | #custom-color-picker .label-wrap, |
| | #custom-color-picker label, |
| | #custom-color-picker .block-label, |
| | #custom-color-picker span.svelte-1gfkn6j, |
| | #custom-color-picker > span { display: none !important; } |
| | #custom-color-picker, |
| | #custom-color-picker fieldset, |
| | fieldset#custom-color-picker { |
| | min-height: 0 !important; |
| | padding: 0 !important; |
| | border: none !important; |
| | background: #272727 !important; |
| | display: flex !important; |
| | justify-content: center !important; |
| | } |
| | /* Force dark background on ALL descendants of the color picker */ |
| | #custom-color-picker *, |
| | #custom-color-picker div, |
| | #custom-color-picker fieldset, |
| | #custom-color-picker .block, |
| | #custom-color-picker .wrap { |
| | background-color: #272727 !important; |
| | border-color: #3a3a3a !important; |
| | } |
| | /* Hide the trigger swatch, keep popup functional */ |
| | #custom-color-picker .wrap { height: 0 !important; overflow: visible !important; } |
| | #custom-color-picker button { height: 0 !important; width: 0 !important; padding: 0 !important; border: none !important; overflow: visible !important; } |
| | /* Hide Hex/RGB/HSL mode switcher buttons */ |
| | button.svelte-nbn1m9 { display: none !important; } |
| | /* Force all group/panel backgrounds to match */ |
| | .gr-group, .gr-block, .gr-panel, .group, .panel, |
| | div[class*="group"], div[class*="panel"] { |
| | background: #272727 !important; |
| | } |
| | /* Color row layout β centered in box */ |
| | #color-row, #color-row.svelte-7xavid { |
| | gap: 6px !important; |
| | align-items: center !important; |
| | justify-content: center !important; |
| | padding: 10px 0 6px !important; |
| | background: #272727 !important; |
| | background-color: #272727 !important; |
| | } |
| | """ |
| |
|
| | _dark_theme = gr.themes.Soft( |
| | primary_hue=gr.themes.Color( |
| | c50="#02C160", c100="rgba(2,193,96,0.2)", c200="#02C160", |
| | c300="rgba(2,193,96,0.32)", c400="rgba(2,193,96,0.32)", |
| | c500="rgba(2,193,96,1.0)", c600="rgba(2,193,96,1.0)", |
| | c700="rgba(2,193,96,0.32)", c800="rgba(2,193,96,0.32)", |
| | c900="#02C160", c950="#02C160", |
| | ), |
| | secondary_hue=gr.themes.Color( |
| | c50="#576b95", c100="#576b95", c200="#576b95", c300="#576b95", |
| | c400="#576b95", c500="#576b95", c600="#576b95", c700="#576b95", |
| | c800="#576b95", c900="#576b95", c950="#576b95", |
| | ), |
| | neutral_hue=gr.themes.Color( |
| | c50="#2a2a2a", c100="#313131", c200="#3a3a3a", c300="#4a4a4a", |
| | c400="#B2B2B2", c500="#808080", c600="#636363", c700="#515151", |
| | c800="#393939", c900="#272727", c950="#171717", |
| | ), |
| | font=[gr.themes.GoogleFont("Montserrat"), "ui-sans-serif", "system-ui", "sans-serif"], |
| | font_mono=[gr.themes.GoogleFont("IBM Plex Mono"), "ui-monospace", "Consolas", "monospace"], |
| | ).set( |
| | body_background_fill="#171717", |
| | body_background_fill_dark="#171717", |
| | body_text_color="#e0e0e0", |
| | body_text_color_dark="#e0e0e0", |
| | body_text_color_subdued="#808080", |
| | body_text_color_subdued_dark="#808080", |
| | block_background_fill="#272727", |
| | block_background_fill_dark="#272727", |
| | block_border_color="#3a3a3a", |
| | block_border_color_dark="#3a3a3a", |
| | block_border_width="0px", |
| | block_label_background_fill="rgba(2,193,96,0.2)", |
| | block_label_background_fill_dark="rgba(2,193,96,0.2)", |
| | block_label_text_color="rgba(2,193,96,1.0)", |
| | block_label_text_color_dark="rgba(2,193,96,1.0)", |
| | block_title_background_fill="rgba(2,193,96,0.2)", |
| | block_title_text_color="rgba(2,193,96,1.0)", |
| | block_title_text_color_dark="rgba(2,193,96,1.0)", |
| | input_background_fill="#313131", |
| | input_background_fill_dark="#313131", |
| | input_border_color="#3a3a3a", |
| | input_border_color_dark="#3a3a3a", |
| | input_border_width="0px", |
| | button_primary_background_fill="#06AE56", |
| | button_primary_background_fill_dark="#06AE56", |
| | button_primary_background_fill_hover="#07C863", |
| | button_primary_background_fill_hover_dark="#07C863", |
| | button_primary_border_color="#06AE56", |
| | button_primary_border_color_dark="#06AE56", |
| | button_primary_text_color="#FFFFFF", |
| | button_primary_text_color_dark="#FFFFFF", |
| | button_secondary_background_fill="#2B2B2B", |
| | button_secondary_background_fill_dark="#2B2B2B", |
| | button_secondary_text_color="#FFFFFF", |
| | button_secondary_text_color_dark="#FFFFFF", |
| | background_fill_primary="#171717", |
| | background_fill_primary_dark="#171717", |
| | background_fill_secondary="#272727", |
| | background_fill_secondary_dark="#272727", |
| | border_color_primary="#3a3a3a", |
| | border_color_primary_dark="#3a3a3a", |
| | panel_background_fill="#272727", |
| | panel_background_fill_dark="#272727", |
| | panel_border_color="#3a3a3a", |
| | panel_border_color_dark="#3a3a3a", |
| | shadow_drop="0 1px 4px 0 rgb(0 0 0 / 0.3)", |
| | shadow_drop_lg="0 2px 5px 0 rgb(0 0 0 / 0.3)", |
| | color_accent_soft="#272727", |
| | color_accent_soft_dark="#272727", |
| | ) |
| |
|
| | with gr.Blocks( |
| | title="SyncAI", |
| | theme=_dark_theme, |
| | css=_custom_css, |
| | ) as demo: |
| | gr.Markdown("# SyncAI\n### AI Music Ads Generator") |
| | gr.Markdown( |
| | "Upload a song (~15s clip), pick a visual style, and generate " |
| | "a beat-synced music video ad." |
| | ) |
| |
|
| | |
| | _EXAMPLES_DIR = Path("examples") |
| | _COVER_ART_MAP = { |
| | "Gone": "Gone.jpg", |
| | "Cant find myself": "Cant find myself.png", |
| | "The more I do": "The more I do.png", |
| | "House of House": "House of House.png", |
| | } |
| | _example_songs = {} |
| | _example_covers = {} |
| | if _EXAMPLES_DIR.exists(): |
| | for wav in sorted(_EXAMPLES_DIR.glob("*.wav")): |
| | _example_songs[wav.stem] = str(wav) |
| | cover_file = _COVER_ART_MAP.get(wav.stem, "") |
| | cover_path = _EXAMPLES_DIR / cover_file |
| | if cover_path.exists(): |
| | _example_covers[wav.stem] = str(cover_path) |
| |
|
| | def _on_example_song(song_name, cover_mode): |
| | if not song_name: |
| | return None, None |
| | audio = _example_songs.get(song_name) |
| | cover = _example_covers.get(song_name) if cover_mode == "With cover art" else None |
| | return audio, cover |
| |
|
| | with gr.Row(equal_height=True): |
| | |
| | with gr.Column(): |
| | audio_input = gr.Audio( |
| | label="Upload Song", |
| | type="filepath", |
| | sources=["upload"], |
| | ) |
| | with gr.Group(): |
| | example_song = gr.Dropdown( |
| | choices=list(_example_songs.keys()) if _example_songs else [], |
| | value=None, |
| | label="Or pick an example", |
| | info="Pre-loaded ~15s song clips to try the pipeline", |
| | ) |
| | example_cover_mode = gr.Radio( |
| | choices=["With cover art", "Without cover art"], |
| | value="With cover art", |
| | show_label=False, |
| | info="Include album artwork overlay from the drop onwards", |
| | ) |
| |
|
| | |
| | with gr.Column(): |
| | cover_art_input = gr.Image( |
| | label="Cover Art (optional)", |
| | type="filepath", |
| | sources=["upload"], |
| | ) |
| |
|
| | |
| | with gr.Column(): |
| | style_dropdown = gr.Dropdown( |
| | choices=style_names(), |
| | value="Sunset Coastal Drive", |
| | label="Visual Style", |
| | info="LoRA style applied to generated images", |
| | ) |
| |
|
| | |
| | with gr.Row(visible=not IS_SPACES): |
| | with gr.Column(): |
| | with gr.Group(): |
| | run_mode = gr.Radio( |
| | choices=["New Run", "Resume Existing"], |
| | value="New Run", |
| | label="Run Mode", |
| | ) |
| | existing_run = gr.Dropdown( |
| | choices=_list_runs(), |
| | label="Existing Run", |
| | visible=False, |
| | ) |
| | start_step = gr.Dropdown( |
| | choices=[], |
| | label="Start From Step", |
| | visible=False, |
| | ) |
| | reuse_files = gr.Checkbox( |
| | value=True, |
| | label="Reuse existing images & videos", |
| | info="Uncheck to regenerate images and video clips", |
| | visible=False, |
| | ) |
| |
|
| | generate_btn = gr.Button("Generate Video", variant="primary") |
| | video_output = gr.Video(label="Generated Music Video") |
| | reshuffle_btn = gr.Button("Reshuffle", variant="secondary", visible=True, interactive=False) |
| | last_run_dir = gr.State(value="") |
| |
|
| | |
| | _st_run_dir = gr.State(value="") |
| | _st_style = gr.State(value="") |
| | _st_step = gr.State(value="1") |
| | _st_cover = gr.State(value="") |
| |
|
| | |
| | example_song.change( |
| | fn=_on_example_song, |
| | inputs=[example_song, example_cover_mode], |
| | outputs=[audio_input, cover_art_input], |
| | ) |
| | example_cover_mode.change( |
| | fn=_on_example_song, |
| | inputs=[example_song, example_cover_mode], |
| | outputs=[audio_input, cover_art_input], |
| | ) |
| |
|
| | run_mode.change( |
| | fn=_on_run_mode_change, |
| | inputs=run_mode, |
| | outputs=[audio_input, existing_run, start_step, reuse_files], |
| | ) |
| | existing_run.change( |
| | fn=_on_run_selected, |
| | inputs=existing_run, |
| | outputs=start_step, |
| | ) |
| |
|
| | generate_btn.click( |
| | fn=generate_cpu, |
| | inputs=[audio_input, style_dropdown, |
| | cover_art_input, run_mode, existing_run, start_step, reuse_files], |
| | outputs=[video_output, _st_run_dir, _st_style, _st_step, _st_cover], |
| | ).then( |
| | fn=generate_images, |
| | inputs=[video_output, _st_run_dir, _st_style, _st_step, _st_cover], |
| | outputs=[video_output, _st_run_dir, _st_style, _st_step, _st_cover], |
| | ).then( |
| | fn=generate_videos, |
| | inputs=[video_output, _st_run_dir, _st_style, _st_step, _st_cover], |
| | outputs=[video_output, _st_run_dir, _st_cover], |
| | ).then( |
| | fn=generate_assembly, |
| | inputs=[_st_run_dir, _st_cover], |
| | outputs=[video_output, last_run_dir, reshuffle_btn], |
| | ) |
| | reshuffle_btn.click( |
| | fn=reshuffle, |
| | inputs=[last_run_dir, cover_art_input], |
| | outputs=video_output, |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |
| |
|