""" app.py β WAN 2.1 Dataset Creator (HuggingFace Spaces Edition) Gradio-powered UI for preparing video + caption datasets for WAN 2.1 LoRA training. Tabs: 1. π¬ Video Ingest β Upload, trim, validate source videos 2. βοΈ Caption Studio β Write / template-build captions per clip 3. β Validator β Check pairs, naming, frame counts 4. π¦ Export & Handoff β Final dataset summary + zip download Differences from Colab version: - No Google Drive sync (not available on HF Spaces) - Export produces a downloadable .zip instead of Drive copy - FFmpeg auto-installed at startup if not present - All paths relative to /tmp/dataset_builder (HF writable space) - share=False, server_name="0.0.0.0" for Spaces compatibility """ import os import sys import glob import json import shutil import warnings import subprocess import tempfile import zipfile from pathlib import Path from datetime import datetime # ββ Auto-install FFmpeg on HuggingFace Spaces βββββββββββββββββββββββββββββββββ def _ensure_ffmpeg(): try: subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True) except (FileNotFoundError, subprocess.CalledProcessError): print("βοΈ FFmpeg not found β installing via apt-get...") subprocess.run(["apt-get", "update", "-qq"], capture_output=True) subprocess.run(["apt-get", "install", "-y", "-qq", "ffmpeg"], capture_output=True) print("β FFmpeg installed.") _ensure_ffmpeg() import gradio as gr # ββ Path Setup (HF Spaces uses /tmp for writable storage) βββββββββββββββββββββ BASE_DIR = "/tmp/dataset_builder" VIDEO_DIR = os.path.join(BASE_DIR, "videos") CAPTION_DIR = os.path.join(BASE_DIR, "captions") EXPORT_DIR = os.path.join(BASE_DIR, "exports") for d in [VIDEO_DIR, CAPTION_DIR, EXPORT_DIR]: os.makedirs(d, exist_ok=True) # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # HELPERS # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def _probe_video(path: str) -> dict: """Use ffprobe to get video metadata.""" try: cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", path ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) data = json.loads(result.stdout) vstream = next((s for s in data.get("streams", []) if s.get("codec_type") == "video"), {}) duration = float(data.get("format", {}).get("duration", 0)) fps_raw = vstream.get("r_frame_rate", "0/1") num, den = fps_raw.split("/") fps = round(float(num) / float(den), 2) if float(den) else 0 w = int(vstream.get("width", 0)) h = int(vstream.get("height", 0)) frames = int(vstream.get("nb_frames", 0)) or int(duration * fps) return {"duration": round(duration, 2), "fps": fps, "width": w, "height": h, "frames": frames, "ok": True} except Exception as e: return {"duration": 0, "fps": 0, "width": 0, "height": 0, "frames": 0, "ok": False, "error": str(e)} def _sanitize_name(name: str) -> str: """Lowercase, replace non-alphanumeric chars with underscores.""" import re name = os.path.splitext(name)[0] name = name.lower() name = re.sub(r"[^a-z0-9_]", "_", name) name = re.sub(r"_+", "_", name).strip("_") return name def _get_all_videos() -> list: """Return list of dicts for every video in VIDEO_DIR.""" videos = [] for f in sorted(glob.glob(os.path.join(VIDEO_DIR, "*.mp4"))): stem = Path(f).stem cap_path = os.path.join(CAPTION_DIR, f"{stem}.txt") caption = open(cap_path).read().strip() if os.path.exists(cap_path) else "" meta = _probe_video(f) videos.append({ "stem": stem, "video_path": f, "caption_path": cap_path, "has_caption": os.path.exists(cap_path), "caption": caption, **meta, }) return videos def _validation_issues(v: dict) -> list: issues = [] if not v["has_caption"]: issues.append("β Missing caption file") if v["duration"] < 1: issues.append("β οΈ Duration < 1s (too short)") if v["duration"] > 10: issues.append("β οΈ Duration > 10s (trim recommended)") if v["frames"] < 8: issues.append("β Fewer than 8 frames") if v["fps"] < 18: issues.append("β οΈ Low FPS (< 18)") if v["width"] < 640: issues.append("β οΈ Resolution below 720p") if v["caption"] and len(v["caption"]) < 20: issues.append("β οΈ Caption very short (< 20 chars)") return issues # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # TAB 1 β VIDEO INGEST # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def ingest_videos(files): if not files: return "No files selected.", video_gallery_md() log = [] for f in files: raw_name = os.path.basename(f.name) stem = _sanitize_name(raw_name) dest_name = f"{stem}.mp4" dest = os.path.join(VIDEO_DIR, dest_name) shutil.copy(f.name, dest) meta = _probe_video(dest) if meta["ok"]: warns = [] if meta["duration"] > 10: warns.append(f"duration {meta['duration']}s > 10s") if meta["frames"] < 8: warns.append(f"only {meta['frames']} frames") warn_str = f" β οΈ {', '.join(warns)}" if warns else "" log.append( f"β {dest_name} β {meta['duration']}s | " f"{meta['fps']}fps | {meta['width']}Γ{meta['height']}{warn_str}" ) else: log.append(f"β οΈ {dest_name} β saved (ffprobe unavailable, verify manually)") return "\n".join(log), video_gallery_md() def trim_video(source_path, start_time, end_time, output_stem): if not source_path: return "β No source file path provided.", video_gallery_md() stem = _sanitize_name(output_stem) if output_stem.strip() else _sanitize_name(Path(source_path).stem) + "_trimmed" dest = os.path.join(VIDEO_DIR, f"{stem}.mp4") try: cmd = [ "ffmpeg", "-y", "-i", source_path, "-ss", str(start_time), "-to", str(end_time), "-c:v", "libx264", "-c:a", "aac", dest ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if result.returncode != 0: return f"β FFmpeg error:\n{result.stderr[-500:]}", video_gallery_md() meta = _probe_video(dest) return ( f"β Trimmed β {stem}.mp4\n" f" Duration: {meta['duration']}s | FPS: {meta['fps']} | " f"{meta['width']}Γ{meta['height']} | Frames: {meta['frames']}" ), video_gallery_md() except Exception as e: return f"β Error: {e}", video_gallery_md() def delete_video(stem): stem = stem.strip() if not stem: return "β No stem provided.", video_gallery_md() msgs = [] for ext, folder in [(".mp4", VIDEO_DIR), (".txt", CAPTION_DIR)]: path = os.path.join(folder, f"{stem}{ext}") if os.path.exists(path): os.remove(path) msgs.append(f"ποΈ Deleted {stem}{ext}") if not msgs: msgs.append(f"β οΈ No files found for stem: '{stem}'") return "\n".join(msgs), video_gallery_md() def video_gallery_md() -> str: videos = _get_all_videos() if not videos: return "π No videos yet. Upload `.mp4` files above." rows = [] for v in videos: cap_icon = "β " if v["has_caption"] else "β οΈ" dur = f"{v['duration']}s" if v["duration"] else "?" fps = f"{v['fps']}fps" if v["fps"] else "?" res = f"{v['width']}Γ{v['height']}" if v["width"] else "?" rows.append(f"| `{v['stem']}` | {dur} | {fps} | {res} | {cap_icon} |") header = ( f"### π¬ {len(videos)} Video(s) in Dataset\n" "| Stem | Duration | FPS | Resolution | Caption |\n" "|------|----------|-----|------------|---------|" ) return header + "\n" + "\n".join(rows) def get_video_stems(): return [Path(f).stem for f in sorted(glob.glob(os.path.join(VIDEO_DIR, "*.mp4")))] # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # TAB 2 β CAPTION STUDIO # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def load_caption_for_stem(stem): if not stem: return "", "Select a video above." cap_path = os.path.join(CAPTION_DIR, f"{stem}.txt") if os.path.exists(cap_path): return open(cap_path).read(), f"π Loaded caption for `{stem}`" return "", f"π No caption yet for `{stem}` β write one and save." def save_caption(stem, caption_text): if not stem: return "β No video selected.", caption_summary_md() if not caption_text.strip(): return "β Caption is empty.", caption_summary_md() cap_path = os.path.join(CAPTION_DIR, f"{stem}.txt") with open(cap_path, "w", encoding="utf-8") as f: f.write(caption_text.strip()) return f"β Saved caption for `{stem}`", caption_summary_md() def build_caption_from_template(subject, action, environment, lighting, camera): parts = [p.strip() for p in [subject, action, environment] if p.strip()] s1 = ("A " + " ".join(parts) + ".") if parts else "" s2 = (lighting.strip() + ".") if lighting.strip() else "" s3 = (camera.strip() + ".") if camera.strip() else "" return " ".join(s for s in [s1, s2, s3] if s) def caption_summary_md() -> str: videos = _get_all_videos() if not videos: return "π No videos loaded yet." rows = [] for v in videos: if v["has_caption"] and v["caption"]: preview = v["caption"][:60].replace("\n", " ") preview += "β¦" if len(v["caption"]) > 60 else "" rows.append(f"| `{v['stem']}` | β | {preview} |") else: rows.append(f"| `{v['stem']}` | β οΈ Missing | β |") paired = sum(1 for v in videos if v["has_caption"] and v["caption"]) header = ( f"### βοΈ Caption Status β {paired}/{len(videos)} complete\n" "| Video | Status | Preview |\n|-------|--------|---------|" ) return header + "\n" + "\n".join(rows) def generate_bulk_template(): videos = _get_all_videos() if not videos: return "π No videos loaded." lines = [] for v in videos: lines.append(f"--- {v['stem']}") lines.append(v["caption"] if v["caption"] else "A [subject] [action] [environment]. [lighting]. [camera shot].") lines.append("") return "\n".join(lines) def save_all_bulk_captions(bulk_text: str): if not bulk_text.strip(): return "β No text provided.", caption_summary_md() saved, current_stem, current_lines = [], None, [] for line in bulk_text.splitlines(): if line.startswith("---"): if current_stem and current_lines: cap_path = os.path.join(CAPTION_DIR, f"{current_stem}.txt") with open(cap_path, "w") as f: f.write("\n".join(current_lines).strip()) saved.append(current_stem) current_stem = line.lstrip("- ").strip() current_lines = [] elif current_stem is not None: current_lines.append(line) if current_stem and current_lines: cap_path = os.path.join(CAPTION_DIR, f"{current_stem}.txt") with open(cap_path, "w") as f: f.write("\n".join(current_lines).strip()) saved.append(current_stem) return f"β Saved {len(saved)} caption(s): {', '.join(saved)}", caption_summary_md() # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # TAB 3 β VALIDATOR # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def run_full_validation(): videos = _get_all_videos() if not videos: return "π No videos to validate. Upload files in the Video Ingest tab.", "" all_ok, has_warn, has_err = [], [], [] detail_rows = [] for v in videos: issues = _validation_issues(v) errors = [i for i in issues if i.startswith("β")] warns = [i for i in issues if i.startswith("β οΈ")] if errors: has_err.append(v["stem"]); status = "β Error" elif warns: has_warn.append(v["stem"]); status = "β οΈ Warning" else: all_ok.append(v["stem"]); status = "β Ready" issue_str = " | ".join(issues) if issues else "β" detail_rows.append( f"| `{v['stem']}` | {v['duration']}s | {v['frames']} | {status} | {issue_str} |" ) summary = ( f"### Validation Complete β {len(videos)} video(s)\n\n" f"β **Ready:** {len(all_ok)} | " f"β οΈ **Warnings:** {len(has_warn)} | " f"β **Errors:** {len(has_err)}\n\n" ) if has_err: summary += f"**Must fix before export:** {', '.join(f'`{s}`' for s in has_err)}\n\n" if has_warn: summary += f"**Review recommended:** {', '.join(f'`{s}`' for s in has_warn)}\n\n" if not has_err and not has_warn: summary += "π **All clips are ready to export!**\n\n" header = ( "| Video | Duration | Frames | Status | Issues |\n" "|-------|----------|--------|--------|--------|" ) detail = header + "\n" + "\n".join(detail_rows) return summary, detail def naming_check_report(): videos = _get_all_videos() if not videos: return "π No videos loaded." import re issues = [] for v in videos: stem = v["stem"] if re.search(r"[^a-z0-9_]", stem): issues.append(f"β οΈ `{stem}` β invalid characters (use a-z, 0-9, _ only)") if stem != stem.lower(): issues.append(f"β οΈ `{stem}` β contains uppercase") return "\n".join(issues) if issues else "β All filenames valid." # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # TAB 4 β EXPORT & DOWNLOAD # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def dataset_summary_md() -> str: videos = _get_all_videos() if not videos: return "π No dataset yet." paired = sum(1 for v in videos if v["has_caption"]) total_dur = sum(v["duration"] for v in videos) ready = sum(1 for v in videos if not _validation_issues(v)) return f"""### π Dataset Summary | Metric | Value | |--------|-------| | Total videos | {len(videos)} | | Captioned | {paired} / {len(videos)} | | Ready to encode | {ready} / {len(videos)} | | Total duration | {total_dur:.1f}s ({total_dur/60:.1f} min) | ### Quick Checklist - {"β " if len(videos) >= 10 else "β οΈ"} 10β20 clips (`{len(videos)}` loaded) - {"β " if all(v["duration"] >= 2 for v in videos) else "β οΈ"} All clips β₯ 2 seconds - {"β " if all(v["duration"] <= 10 for v in videos) else "β οΈ"} All clips β€ 10 seconds - {"β " if all(v["frames"] >= 8 for v in videos) else "β"} All clips have β₯ 8 frames - {"β " if paired == len(videos) else "β"} All videos have captions - {"β " if ready == len(videos) else "β οΈ"} No validation errors """ def export_dataset_zip(): videos = _get_all_videos() if not videos: return "β No videos to export.", None, dataset_summary_md() fatal = [ v for v in videos if any(i.startswith("β") for i in _validation_issues(v)) ] if fatal: stems = ", ".join(f"`{v['stem']}`" for v in fatal) return f"β Fix errors first: {stems}", None, dataset_summary_md() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_path = os.path.join(EXPORT_DIR, f"wan21_dataset_{timestamp}.zip") with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for v in videos: zf.write(v["video_path"], arcname=f"{v['stem']}.mp4") if v["has_caption"]: zf.write(v["caption_path"], arcname=f"{v['stem']}.txt") size_mb = os.path.getsize(zip_path) / (1024 * 1024) msg = ( f"β Exported {len(videos)} pairs β `wan21_dataset_{timestamp}.zip` " f"({size_mb:.1f} MB)\n\n" f"Click **Download ZIP** below to save it." ) return msg, zip_path, dataset_summary_md() # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # THEME & CSS # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ THEME = gr.themes.Base( primary_hue=gr.themes.colors.violet, secondary_hue=gr.themes.colors.purple, neutral_hue=gr.themes.colors.slate, font=gr.themes.GoogleFont("Inter"), ).set( body_background_fill="#0a0a0f", body_background_fill_dark="#0a0a0f", block_background_fill="#12121a", block_background_fill_dark="#12121a", block_border_color="#1e1e2e", block_border_color_dark="#1e1e2e", block_label_text_color="#a78bfa", block_label_text_color_dark="#a78bfa", block_title_text_color="#e2e8f0", block_title_text_color_dark="#e2e8f0", body_text_color="#cbd5e1", body_text_color_dark="#cbd5e1", button_primary_background_fill="#7c3aed", button_primary_background_fill_dark="#7c3aed", button_primary_background_fill_hover="#6d28d9", button_primary_background_fill_hover_dark="#6d28d9", button_primary_text_color="#ffffff", button_primary_text_color_dark="#ffffff", button_secondary_background_fill="#1e1e2e", button_secondary_background_fill_dark="#1e1e2e", button_secondary_text_color="#a78bfa", button_secondary_text_color_dark="#a78bfa", input_background_fill="#1a1a2e", input_background_fill_dark="#1a1a2e", input_border_color="#2d2d44", input_border_color_dark="#2d2d44", shadow_drop="0 4px 14px rgba(124, 58, 237, 0.08)", shadow_drop_lg="0 8px 24px rgba(124, 58, 237, 0.12)", ) CSS = """ .gradio-container { max-width: 980px !important; margin: auto; } .main-title { text-align: center; background: linear-gradient(135deg, #7c3aed 0%, #a78bfa 50%, #c4b5fd 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-size: 2.1rem; font-weight: 800; margin-bottom: 0.15rem; letter-spacing: -0.5px; } .sub-title { text-align: center; color: #64748b; font-size: 0.92rem; margin-bottom: 1rem; } .status-bar { padding: 10px 16px; background: linear-gradient(135deg, #1a1a2e, #16162a); border: 1px solid #2d2d44; border-radius: 8px; font-size: 0.9rem; } .tip-box { background: #13131f; border-left: 3px solid #7c3aed; border-radius: 0 8px 8px 0; padding: 10px 14px; margin: 6px 0; font-size: 0.88rem; color: #94a3b8; } """ # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # UI ASSEMBLY # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def build_ui(): with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) blocks = gr.Blocks(theme=THEME, css=CSS, title="WAN 2.1 Dataset Creator") with blocks: gr.HTML("