Spaces:
Running
Running
| """ | |
| app.py β WAN 2.1 Dataset Creator (HuggingFace Spaces Edition) | |
| Gradio-powered UI for preparing video + caption datasets for WAN 2.1 LoRA training. | |
| Tabs: | |
| 1. π¬ Video Ingest β Upload, trim, validate source videos | |
| 2. βοΈ Caption Studio β Write / template-build captions per clip | |
| 3. β Validator β Check pairs, naming, frame counts | |
| 4. π¦ Export & Handoff β Final dataset summary + zip download | |
| Differences from Colab version: | |
| - No Google Drive sync (not available on HF Spaces) | |
| - Export produces a downloadable .zip instead of Drive copy | |
| - FFmpeg auto-installed at startup if not present | |
| - All paths relative to /tmp/dataset_builder (HF writable space) | |
| - share=False, server_name="0.0.0.0" for Spaces compatibility | |
| """ | |
| import os | |
| import sys | |
| import glob | |
| import json | |
| import shutil | |
| import warnings | |
| import subprocess | |
| import tempfile | |
| import zipfile | |
| from pathlib import Path | |
| from datetime import datetime | |
| # ββ Auto-install FFmpeg on HuggingFace Spaces βββββββββββββββββββββββββββββββββ | |
| def _ensure_ffmpeg(): | |
| try: | |
| subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True) | |
| except (FileNotFoundError, subprocess.CalledProcessError): | |
| print("βοΈ FFmpeg not found β installing via apt-get...") | |
| subprocess.run(["apt-get", "update", "-qq"], capture_output=True) | |
| subprocess.run(["apt-get", "install", "-y", "-qq", "ffmpeg"], capture_output=True) | |
| print("β FFmpeg installed.") | |
| _ensure_ffmpeg() | |
| import gradio as gr | |
| # ββ Path Setup (HF Spaces uses /tmp for writable storage) βββββββββββββββββββββ | |
| BASE_DIR = "/tmp/dataset_builder" | |
| VIDEO_DIR = os.path.join(BASE_DIR, "videos") | |
| CAPTION_DIR = os.path.join(BASE_DIR, "captions") | |
| EXPORT_DIR = os.path.join(BASE_DIR, "exports") | |
| for d in [VIDEO_DIR, CAPTION_DIR, EXPORT_DIR]: | |
| os.makedirs(d, exist_ok=True) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # HELPERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _probe_video(path: str) -> dict: | |
| """Use ffprobe to get video metadata.""" | |
| try: | |
| cmd = [ | |
| "ffprobe", "-v", "quiet", "-print_format", "json", | |
| "-show_streams", "-show_format", path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) | |
| data = json.loads(result.stdout) | |
| vstream = next((s for s in data.get("streams", []) if s.get("codec_type") == "video"), {}) | |
| duration = float(data.get("format", {}).get("duration", 0)) | |
| fps_raw = vstream.get("r_frame_rate", "0/1") | |
| num, den = fps_raw.split("/") | |
| fps = round(float(num) / float(den), 2) if float(den) else 0 | |
| w = int(vstream.get("width", 0)) | |
| h = int(vstream.get("height", 0)) | |
| frames = int(vstream.get("nb_frames", 0)) or int(duration * fps) | |
| return {"duration": round(duration, 2), "fps": fps, "width": w, | |
| "height": h, "frames": frames, "ok": True} | |
| except Exception as e: | |
| return {"duration": 0, "fps": 0, "width": 0, "height": 0, | |
| "frames": 0, "ok": False, "error": str(e)} | |
| def _sanitize_name(name: str) -> str: | |
| """Lowercase, replace non-alphanumeric chars with underscores.""" | |
| import re | |
| name = os.path.splitext(name)[0] | |
| name = name.lower() | |
| name = re.sub(r"[^a-z0-9_]", "_", name) | |
| name = re.sub(r"_+", "_", name).strip("_") | |
| return name | |
| def _get_all_videos() -> list: | |
| """Return list of dicts for every video in VIDEO_DIR.""" | |
| videos = [] | |
| for f in sorted(glob.glob(os.path.join(VIDEO_DIR, "*.mp4"))): | |
| stem = Path(f).stem | |
| cap_path = os.path.join(CAPTION_DIR, f"{stem}.txt") | |
| caption = open(cap_path).read().strip() if os.path.exists(cap_path) else "" | |
| meta = _probe_video(f) | |
| videos.append({ | |
| "stem": stem, | |
| "video_path": f, | |
| "caption_path": cap_path, | |
| "has_caption": os.path.exists(cap_path), | |
| "caption": caption, | |
| **meta, | |
| }) | |
| return videos | |
| def _validation_issues(v: dict) -> list: | |
| issues = [] | |
| if not v["has_caption"]: issues.append("β Missing caption file") | |
| if v["duration"] < 1: issues.append("β οΈ Duration < 1s (too short)") | |
| if v["duration"] > 10: issues.append("β οΈ Duration > 10s (trim recommended)") | |
| if v["frames"] < 8: issues.append("β Fewer than 8 frames") | |
| if v["fps"] < 18: issues.append("β οΈ Low FPS (< 18)") | |
| if v["width"] < 640: issues.append("β οΈ Resolution below 720p") | |
| if v["caption"] and len(v["caption"]) < 20: | |
| issues.append("β οΈ Caption very short (< 20 chars)") | |
| return issues | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 1 β VIDEO INGEST | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def ingest_videos(files): | |
| if not files: | |
| return "No files selected.", video_gallery_md() | |
| log = [] | |
| for f in files: | |
| raw_name = os.path.basename(f.name) | |
| stem = _sanitize_name(raw_name) | |
| dest_name = f"{stem}.mp4" | |
| dest = os.path.join(VIDEO_DIR, dest_name) | |
| shutil.copy(f.name, dest) | |
| meta = _probe_video(dest) | |
| if meta["ok"]: | |
| warns = [] | |
| if meta["duration"] > 10: warns.append(f"duration {meta['duration']}s > 10s") | |
| if meta["frames"] < 8: warns.append(f"only {meta['frames']} frames") | |
| warn_str = f" β οΈ {', '.join(warns)}" if warns else "" | |
| log.append( | |
| f"β {dest_name} β {meta['duration']}s | " | |
| f"{meta['fps']}fps | {meta['width']}Γ{meta['height']}{warn_str}" | |
| ) | |
| else: | |
| log.append(f"β οΈ {dest_name} β saved (ffprobe unavailable, verify manually)") | |
| return "\n".join(log), video_gallery_md() | |
| def trim_video(source_path, start_time, end_time, output_stem): | |
| if not source_path: | |
| return "β No source file path provided.", video_gallery_md() | |
| stem = _sanitize_name(output_stem) if output_stem.strip() else _sanitize_name(Path(source_path).stem) + "_trimmed" | |
| dest = os.path.join(VIDEO_DIR, f"{stem}.mp4") | |
| try: | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", source_path, | |
| "-ss", str(start_time), "-to", str(end_time), | |
| "-c:v", "libx264", "-c:a", "aac", dest | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) | |
| if result.returncode != 0: | |
| return f"β FFmpeg error:\n{result.stderr[-500:]}", video_gallery_md() | |
| meta = _probe_video(dest) | |
| return ( | |
| f"β Trimmed β {stem}.mp4\n" | |
| f" Duration: {meta['duration']}s | FPS: {meta['fps']} | " | |
| f"{meta['width']}Γ{meta['height']} | Frames: {meta['frames']}" | |
| ), video_gallery_md() | |
| except Exception as e: | |
| return f"β Error: {e}", video_gallery_md() | |
| def delete_video(stem): | |
| stem = stem.strip() | |
| if not stem: | |
| return "β No stem provided.", video_gallery_md() | |
| msgs = [] | |
| for ext, folder in [(".mp4", VIDEO_DIR), (".txt", CAPTION_DIR)]: | |
| path = os.path.join(folder, f"{stem}{ext}") | |
| if os.path.exists(path): | |
| os.remove(path) | |
| msgs.append(f"ποΈ Deleted {stem}{ext}") | |
| if not msgs: | |
| msgs.append(f"β οΈ No files found for stem: '{stem}'") | |
| return "\n".join(msgs), video_gallery_md() | |
| def video_gallery_md() -> str: | |
| videos = _get_all_videos() | |
| if not videos: | |
| return "π No videos yet. Upload `.mp4` files above." | |
| rows = [] | |
| for v in videos: | |
| cap_icon = "β " if v["has_caption"] else "β οΈ" | |
| dur = f"{v['duration']}s" if v["duration"] else "?" | |
| fps = f"{v['fps']}fps" if v["fps"] else "?" | |
| res = f"{v['width']}Γ{v['height']}" if v["width"] else "?" | |
| rows.append(f"| `{v['stem']}` | {dur} | {fps} | {res} | {cap_icon} |") | |
| header = ( | |
| f"### π¬ {len(videos)} Video(s) in Dataset\n" | |
| "| Stem | Duration | FPS | Resolution | Caption |\n" | |
| "|------|----------|-----|------------|---------|" | |
| ) | |
| return header + "\n" + "\n".join(rows) | |
| def get_video_stems(): | |
| return [Path(f).stem for f in sorted(glob.glob(os.path.join(VIDEO_DIR, "*.mp4")))] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 2 β CAPTION STUDIO | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_caption_for_stem(stem): | |
| if not stem: | |
| return "", "Select a video above." | |
| cap_path = os.path.join(CAPTION_DIR, f"{stem}.txt") | |
| if os.path.exists(cap_path): | |
| return open(cap_path).read(), f"π Loaded caption for `{stem}`" | |
| return "", f"π No caption yet for `{stem}` β write one and save." | |
| def save_caption(stem, caption_text): | |
| if not stem: | |
| return "β No video selected.", caption_summary_md() | |
| if not caption_text.strip(): | |
| return "β Caption is empty.", caption_summary_md() | |
| cap_path = os.path.join(CAPTION_DIR, f"{stem}.txt") | |
| with open(cap_path, "w", encoding="utf-8") as f: | |
| f.write(caption_text.strip()) | |
| return f"β Saved caption for `{stem}`", caption_summary_md() | |
| def build_caption_from_template(subject, action, environment, lighting, camera): | |
| parts = [p.strip() for p in [subject, action, environment] if p.strip()] | |
| s1 = ("A " + " ".join(parts) + ".") if parts else "" | |
| s2 = (lighting.strip() + ".") if lighting.strip() else "" | |
| s3 = (camera.strip() + ".") if camera.strip() else "" | |
| return " ".join(s for s in [s1, s2, s3] if s) | |
| def caption_summary_md() -> str: | |
| videos = _get_all_videos() | |
| if not videos: | |
| return "π No videos loaded yet." | |
| rows = [] | |
| for v in videos: | |
| if v["has_caption"] and v["caption"]: | |
| preview = v["caption"][:60].replace("\n", " ") | |
| preview += "β¦" if len(v["caption"]) > 60 else "" | |
| rows.append(f"| `{v['stem']}` | β | {preview} |") | |
| else: | |
| rows.append(f"| `{v['stem']}` | β οΈ Missing | β |") | |
| paired = sum(1 for v in videos if v["has_caption"] and v["caption"]) | |
| header = ( | |
| f"### βοΈ Caption Status β {paired}/{len(videos)} complete\n" | |
| "| Video | Status | Preview |\n|-------|--------|---------|" | |
| ) | |
| return header + "\n" + "\n".join(rows) | |
| def generate_bulk_template(): | |
| videos = _get_all_videos() | |
| if not videos: | |
| return "π No videos loaded." | |
| lines = [] | |
| for v in videos: | |
| lines.append(f"--- {v['stem']}") | |
| lines.append(v["caption"] if v["caption"] else | |
| "A [subject] [action] [environment]. [lighting]. [camera shot].") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def save_all_bulk_captions(bulk_text: str): | |
| if not bulk_text.strip(): | |
| return "β No text provided.", caption_summary_md() | |
| saved, current_stem, current_lines = [], None, [] | |
| for line in bulk_text.splitlines(): | |
| if line.startswith("---"): | |
| if current_stem and current_lines: | |
| cap_path = os.path.join(CAPTION_DIR, f"{current_stem}.txt") | |
| with open(cap_path, "w") as f: | |
| f.write("\n".join(current_lines).strip()) | |
| saved.append(current_stem) | |
| current_stem = line.lstrip("- ").strip() | |
| current_lines = [] | |
| elif current_stem is not None: | |
| current_lines.append(line) | |
| if current_stem and current_lines: | |
| cap_path = os.path.join(CAPTION_DIR, f"{current_stem}.txt") | |
| with open(cap_path, "w") as f: | |
| f.write("\n".join(current_lines).strip()) | |
| saved.append(current_stem) | |
| return f"β Saved {len(saved)} caption(s): {', '.join(saved)}", caption_summary_md() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 3 β VALIDATOR | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_full_validation(): | |
| videos = _get_all_videos() | |
| if not videos: | |
| return "π No videos to validate. Upload files in the Video Ingest tab.", "" | |
| all_ok, has_warn, has_err = [], [], [] | |
| detail_rows = [] | |
| for v in videos: | |
| issues = _validation_issues(v) | |
| errors = [i for i in issues if i.startswith("β")] | |
| warns = [i for i in issues if i.startswith("β οΈ")] | |
| if errors: | |
| has_err.append(v["stem"]); status = "β Error" | |
| elif warns: | |
| has_warn.append(v["stem"]); status = "β οΈ Warning" | |
| else: | |
| all_ok.append(v["stem"]); status = "β Ready" | |
| issue_str = " | ".join(issues) if issues else "β" | |
| detail_rows.append( | |
| f"| `{v['stem']}` | {v['duration']}s | {v['frames']} | {status} | {issue_str} |" | |
| ) | |
| summary = ( | |
| f"### Validation Complete β {len(videos)} video(s)\n\n" | |
| f"β **Ready:** {len(all_ok)} | " | |
| f"β οΈ **Warnings:** {len(has_warn)} | " | |
| f"β **Errors:** {len(has_err)}\n\n" | |
| ) | |
| if has_err: | |
| summary += f"**Must fix before export:** {', '.join(f'`{s}`' for s in has_err)}\n\n" | |
| if has_warn: | |
| summary += f"**Review recommended:** {', '.join(f'`{s}`' for s in has_warn)}\n\n" | |
| if not has_err and not has_warn: | |
| summary += "π **All clips are ready to export!**\n\n" | |
| header = ( | |
| "| Video | Duration | Frames | Status | Issues |\n" | |
| "|-------|----------|--------|--------|--------|" | |
| ) | |
| detail = header + "\n" + "\n".join(detail_rows) | |
| return summary, detail | |
| def naming_check_report(): | |
| videos = _get_all_videos() | |
| if not videos: | |
| return "π No videos loaded." | |
| import re | |
| issues = [] | |
| for v in videos: | |
| stem = v["stem"] | |
| if re.search(r"[^a-z0-9_]", stem): | |
| issues.append(f"β οΈ `{stem}` β invalid characters (use a-z, 0-9, _ only)") | |
| if stem != stem.lower(): | |
| issues.append(f"β οΈ `{stem}` β contains uppercase") | |
| return "\n".join(issues) if issues else "β All filenames valid." | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAB 4 β EXPORT & DOWNLOAD | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def dataset_summary_md() -> str: | |
| videos = _get_all_videos() | |
| if not videos: | |
| return "π No dataset yet." | |
| paired = sum(1 for v in videos if v["has_caption"]) | |
| total_dur = sum(v["duration"] for v in videos) | |
| ready = sum(1 for v in videos if not _validation_issues(v)) | |
| return f"""### π Dataset Summary | |
| | Metric | Value | | |
| |--------|-------| | |
| | Total videos | {len(videos)} | | |
| | Captioned | {paired} / {len(videos)} | | |
| | Ready to encode | {ready} / {len(videos)} | | |
| | Total duration | {total_dur:.1f}s ({total_dur/60:.1f} min) | | |
| ### Quick Checklist | |
| - {"β " if len(videos) >= 10 else "β οΈ"} 10β20 clips (`{len(videos)}` loaded) | |
| - {"β " if all(v["duration"] >= 2 for v in videos) else "β οΈ"} All clips β₯ 2 seconds | |
| - {"β " if all(v["duration"] <= 10 for v in videos) else "β οΈ"} All clips β€ 10 seconds | |
| - {"β " if all(v["frames"] >= 8 for v in videos) else "β"} All clips have β₯ 8 frames | |
| - {"β " if paired == len(videos) else "β"} All videos have captions | |
| - {"β " if ready == len(videos) else "β οΈ"} No validation errors | |
| """ | |
| def export_dataset_zip(): | |
| videos = _get_all_videos() | |
| if not videos: | |
| return "β No videos to export.", None, dataset_summary_md() | |
| fatal = [ | |
| v for v in videos | |
| if any(i.startswith("β") for i in _validation_issues(v)) | |
| ] | |
| if fatal: | |
| stems = ", ".join(f"`{v['stem']}`" for v in fatal) | |
| return f"β Fix errors first: {stems}", None, dataset_summary_md() | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| zip_path = os.path.join(EXPORT_DIR, f"wan21_dataset_{timestamp}.zip") | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for v in videos: | |
| zf.write(v["video_path"], arcname=f"{v['stem']}.mp4") | |
| if v["has_caption"]: | |
| zf.write(v["caption_path"], arcname=f"{v['stem']}.txt") | |
| size_mb = os.path.getsize(zip_path) / (1024 * 1024) | |
| msg = ( | |
| f"β Exported {len(videos)} pairs β `wan21_dataset_{timestamp}.zip` " | |
| f"({size_mb:.1f} MB)\n\n" | |
| f"Click **Download ZIP** below to save it." | |
| ) | |
| return msg, zip_path, dataset_summary_md() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # THEME & CSS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| THEME = gr.themes.Base( | |
| primary_hue=gr.themes.colors.violet, | |
| secondary_hue=gr.themes.colors.purple, | |
| neutral_hue=gr.themes.colors.slate, | |
| font=gr.themes.GoogleFont("Inter"), | |
| ).set( | |
| body_background_fill="#0a0a0f", | |
| body_background_fill_dark="#0a0a0f", | |
| block_background_fill="#12121a", | |
| block_background_fill_dark="#12121a", | |
| block_border_color="#1e1e2e", | |
| block_border_color_dark="#1e1e2e", | |
| block_label_text_color="#a78bfa", | |
| block_label_text_color_dark="#a78bfa", | |
| block_title_text_color="#e2e8f0", | |
| block_title_text_color_dark="#e2e8f0", | |
| body_text_color="#cbd5e1", | |
| body_text_color_dark="#cbd5e1", | |
| button_primary_background_fill="#7c3aed", | |
| button_primary_background_fill_dark="#7c3aed", | |
| button_primary_background_fill_hover="#6d28d9", | |
| button_primary_background_fill_hover_dark="#6d28d9", | |
| button_primary_text_color="#ffffff", | |
| button_primary_text_color_dark="#ffffff", | |
| button_secondary_background_fill="#1e1e2e", | |
| button_secondary_background_fill_dark="#1e1e2e", | |
| button_secondary_text_color="#a78bfa", | |
| button_secondary_text_color_dark="#a78bfa", | |
| input_background_fill="#1a1a2e", | |
| input_background_fill_dark="#1a1a2e", | |
| input_border_color="#2d2d44", | |
| input_border_color_dark="#2d2d44", | |
| shadow_drop="0 4px 14px rgba(124, 58, 237, 0.08)", | |
| shadow_drop_lg="0 8px 24px rgba(124, 58, 237, 0.12)", | |
| ) | |
| CSS = """ | |
| .gradio-container { max-width: 980px !important; margin: auto; } | |
| .main-title { | |
| text-align: center; | |
| background: linear-gradient(135deg, #7c3aed 0%, #a78bfa 50%, #c4b5fd 100%); | |
| -webkit-background-clip: text; | |
| -webkit-text-fill-color: transparent; | |
| font-size: 2.1rem; | |
| font-weight: 800; | |
| margin-bottom: 0.15rem; | |
| letter-spacing: -0.5px; | |
| } | |
| .sub-title { | |
| text-align: center; | |
| color: #64748b; | |
| font-size: 0.92rem; | |
| margin-bottom: 1rem; | |
| } | |
| .status-bar { | |
| padding: 10px 16px; | |
| background: linear-gradient(135deg, #1a1a2e, #16162a); | |
| border: 1px solid #2d2d44; | |
| border-radius: 8px; | |
| font-size: 0.9rem; | |
| } | |
| .tip-box { | |
| background: #13131f; | |
| border-left: 3px solid #7c3aed; | |
| border-radius: 0 8px 8px 0; | |
| padding: 10px 14px; | |
| margin: 6px 0; | |
| font-size: 0.88rem; | |
| color: #94a3b8; | |
| } | |
| """ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UI ASSEMBLY | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_ui(): | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore", DeprecationWarning) | |
| blocks = gr.Blocks(theme=THEME, css=CSS, title="WAN 2.1 Dataset Creator") | |
| with blocks: | |
| gr.HTML("<div class='main-title'>π¬ WAN 2.1 Dataset Creator</div>") | |
| gr.HTML("<div class='sub-title'>Prepare Β· Caption Β· Validate Β· Export β WAN 2.1 Latent Cacher</div>") | |
| gr.Markdown(value=dataset_summary_md, elem_classes=["status-bar"]) | |
| # ββ TAB 1: VIDEO INGEST βββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π¬ Video Ingest", id="ingest"): | |
| gr.Markdown( | |
| "### Step 1 & 2 β Upload & Trim Source Clips\n" | |
| "Upload raw `.mp4` files. Filenames are auto-sanitised to `lowercase_underscore`." | |
| ) | |
| gr.HTML("<div class='tip-box'>π‘ <b>Sweet spot:</b> 3β5 second clips at 720p+, 24β30fps. " | |
| "Aim for 10β20 clips per LoRA concept.</div>") | |
| gr.Markdown("#### π€ Upload Videos") | |
| with gr.Row(): | |
| upload_files = gr.File( | |
| label="Drop .mp4 files here", | |
| file_count="multiple", | |
| file_types=[".mp4"], | |
| scale=3, | |
| ) | |
| upload_btn = gr.Button("β¬οΈ Ingest Files", variant="primary", scale=1) | |
| ingest_log = gr.Textbox(label="Ingest Log", lines=5, interactive=False) | |
| gr.Markdown("---\n#### βοΈ Trim a Clip with FFmpeg") | |
| with gr.Row(): | |
| trim_source = gr.Textbox( | |
| label="Source path (full path in /tmp/dataset_builder/videos/)", | |
| placeholder="/tmp/dataset_builder/videos/raw_footage.mp4", | |
| scale=3, | |
| ) | |
| trim_stem = gr.Textbox(label="Output stem name", placeholder="clip_01", scale=1) | |
| with gr.Row(): | |
| trim_start = gr.Number(label="Start (seconds)", value=0, minimum=0) | |
| trim_end = gr.Number(label="End (seconds)", value=5, minimum=0) | |
| trim_btn = gr.Button("βοΈ Trim & Save", variant="primary") | |
| trim_log = gr.Textbox(label="Trim Log", lines=3, interactive=False) | |
| gr.Markdown("---\n#### ποΈ Remove a Clip") | |
| with gr.Row(): | |
| del_stem = gr.Textbox(label="Stem to delete", placeholder="clip_01", scale=3) | |
| del_btn = gr.Button("ποΈ Delete", variant="secondary", scale=1) | |
| del_log = gr.Textbox(label="Delete Log", lines=2, interactive=False) | |
| gr.Markdown("---") | |
| refresh_gallery_btn = gr.Button("π Refresh Gallery", variant="secondary") | |
| gallery_md = gr.Markdown(value=video_gallery_md) | |
| upload_btn.click(fn=ingest_videos, inputs=[upload_files], outputs=[ingest_log, gallery_md]) | |
| upload_files.upload(fn=ingest_videos, inputs=[upload_files], outputs=[ingest_log, gallery_md]) | |
| trim_btn.click(fn=trim_video, inputs=[trim_source, trim_start, trim_end, trim_stem], outputs=[trim_log, gallery_md]) | |
| del_btn.click(fn=delete_video, inputs=[del_stem], outputs=[del_log, gallery_md]) | |
| refresh_gallery_btn.click(fn=video_gallery_md, outputs=[gallery_md]) | |
| # ββ TAB 2: CAPTION STUDIO βββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("βοΈ Caption Studio", id="captions"): | |
| gr.Markdown( | |
| "### Step 3 β Write Captions\n" | |
| "Every `.mp4` needs a matching `.txt` caption describing subject, action, " | |
| "environment, lighting, and camera." | |
| ) | |
| gr.HTML("<div class='tip-box'>π‘ Good captions: <b>subject + action + environment + " | |
| "lighting + camera</b>. 1β3 sentences. Specific and consistent.</div>") | |
| with gr.Tabs(): | |
| with gr.Tab("ποΈ Per-Clip Editor"): | |
| with gr.Row(): | |
| stem_dropdown = gr.Dropdown( | |
| label="Select Video", | |
| choices=get_video_stems(), | |
| scale=3, | |
| ) | |
| refresh_stems_btn = gr.Button("π", scale=1, variant="secondary") | |
| load_status = gr.Markdown("") | |
| caption_box = gr.Textbox( | |
| label="Caption Text", | |
| lines=5, | |
| placeholder=( | |
| "A fluffy orange tabby cat playing with a red ball on a wooden floor.\n" | |
| "Warm natural sunlight streams through a window.\n" | |
| "Low angle shot with shallow depth of field." | |
| ), | |
| ) | |
| gr.Markdown("#### π§© Template Builder") | |
| gr.HTML("<div class='tip-box'>π‘ Fill the fields and click Build to generate a caption draft, " | |
| "then edit it freely before saving.</div>") | |
| with gr.Row(): | |
| t_subject = gr.Textbox(label="Subject", placeholder="fluffy orange tabby cat") | |
| t_action = gr.Textbox(label="Action", placeholder="playing with a red ball") | |
| with gr.Row(): | |
| t_env = gr.Textbox(label="Environment", placeholder="on a wooden floor in a living room") | |
| t_lighting = gr.Textbox(label="Lighting", placeholder="warm natural sunlight from a window") | |
| t_camera = gr.Textbox(label="Camera / Shot", placeholder="low angle shot, shallow depth of field") | |
| with gr.Row(): | |
| build_btn = gr.Button("π§© Build from Template", variant="secondary") | |
| save_btn = gr.Button("πΎ Save Caption", variant="primary") | |
| save_status = gr.Markdown("") | |
| cap_summary = gr.Markdown(value=caption_summary_md) | |
| stem_dropdown.change( | |
| fn=load_caption_for_stem, | |
| inputs=[stem_dropdown], | |
| outputs=[caption_box, load_status], | |
| ) | |
| refresh_stems_btn.click( | |
| fn=lambda: gr.Dropdown(choices=get_video_stems()), | |
| outputs=[stem_dropdown], | |
| ) | |
| build_btn.click( | |
| fn=build_caption_from_template, | |
| inputs=[t_subject, t_action, t_env, t_lighting, t_camera], | |
| outputs=[caption_box], | |
| ) | |
| save_btn.click( | |
| fn=save_caption, | |
| inputs=[stem_dropdown, caption_box], | |
| outputs=[save_status, cap_summary], | |
| ) | |
| with gr.Tab("π Bulk Editor"): | |
| gr.Markdown( | |
| "Edit all captions at once. Format:\n" | |
| "```\n--- stem_name\nYour caption text here.\n\n--- next_stem\n...\n```\n" | |
| "Click **Load Template** to pre-fill existing captions." | |
| ) | |
| with gr.Row(): | |
| load_bulk_btn = gr.Button("π Load Template", variant="secondary") | |
| save_bulk_btn = gr.Button("πΎ Save All", variant="primary") | |
| bulk_box = gr.Textbox(label="Bulk Caption Editor", lines=22, | |
| placeholder="Click 'Load Template' to startβ¦") | |
| bulk_status = gr.Markdown("") | |
| bulk_summary = gr.Markdown(value=caption_summary_md) | |
| load_bulk_btn.click(fn=generate_bulk_template, outputs=[bulk_box]) | |
| save_bulk_btn.click( | |
| fn=save_all_bulk_captions, | |
| inputs=[bulk_box], | |
| outputs=[bulk_status, bulk_summary], | |
| ) | |
| # ββ TAB 3: VALIDATOR ββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("β Validator", id="validate"): | |
| gr.Markdown( | |
| "### Step 4 β Validate Your Dataset\n" | |
| "Check every clip for duration, frame count, resolution, naming, and captions." | |
| ) | |
| with gr.Row(): | |
| validate_btn = gr.Button("βΆ Run Full Validation", variant="primary", scale=2) | |
| naming_btn = gr.Button("π€ Check Naming Only", variant="secondary", scale=1) | |
| validation_summary = gr.Markdown("") | |
| naming_report = gr.Markdown("") | |
| validation_detail = gr.Markdown("") | |
| gr.Markdown("---\n#### π Requirements Reference") | |
| gr.Markdown(""" | |
| | Parameter | Requirement | | |
| |-----------|-------------| | |
| | Format | `.mp4` only | | |
| | Duration | 2β10 seconds (sweet spot: 3β5s) | | |
| | FPS | 24β30 fps recommended | | |
| | Resolution | 720p+ (auto-resized to 480Γ832) | | |
| | Min frames | 8 frames minimum | | |
| | Caption | Required, 1β3 sentences | | |
| | Filename | Lowercase, underscores, no spaces | | |
| """) | |
| validate_btn.click(fn=run_full_validation, outputs=[validation_summary, validation_detail]) | |
| naming_btn.click(fn=naming_check_report, outputs=[naming_report]) | |
| # ββ TAB 4: EXPORT βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π¦ Export & Download", id="export"): | |
| gr.Markdown( | |
| "### Step 5 β Export Dataset\n" | |
| "Packages all validated video + caption pairs into a single `.zip` for download." | |
| ) | |
| gr.Markdown(value=dataset_summary_md, label="Dataset Summary") | |
| gr.HTML("<div class='tip-box'>π‘ Fix all β validation errors before exporting. " | |
| "β οΈ warnings are safe to ignore.</div>") | |
| export_btn = gr.Button("π¦ Build & Download ZIP", variant="primary", size="lg") | |
| export_status = gr.Markdown("") | |
| download_file = gr.File(label="β¬οΈ Download ZIP", visible=True) | |
| export_summary = gr.Markdown(value=dataset_summary_md) | |
| gr.Markdown("---\n#### π Dataset Checklist") | |
| gr.Markdown(""" | |
| ``` | |
| DATASET CHECKLIST | |
| βββββββββββββββββββββββββββββββββββββββββ | |
| β‘ 10β20 clips, each 3β5 seconds | |
| β‘ All .mp4 format, 720p+, 24β30 fps | |
| β‘ Matching .txt caption for EVERY video | |
| β‘ Filenames: lowercase, underscores, no spaces | |
| β‘ Captions: 1β3 sentences β | |
| subject Β· action Β· environment Β· lighting Β· camera | |
| β‘ No watermarks, black frames, or blurry footage | |
| β‘ All pairs validated β in Validator tab | |
| ``` | |
| """) | |
| gr.Markdown("---\n#### β οΈ Common Issues") | |
| gr.Markdown(""" | |
| | Problem | Solution | | |
| |---------|----------| | |
| | "Missing caption" | Create `.txt` with exact same stem as `.mp4` | | |
| | "Only N frames, need 8" | Clip too short β use β₯ 1 second at 24fps | | |
| | Blurry latent outputs | Source too low-res β use 720p+ footage | | |
| | LoRA overfits | More variety β different angles, lighting, backgrounds | | |
| | LoRA doesn't learn concept | Captions too vague β be more specific | | |
| """) | |
| export_btn.click( | |
| fn=export_dataset_zip, | |
| outputs=[export_status, download_file, export_summary], | |
| ) | |
| # ββ FOOTER ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| gr.HTML( | |
| "<div style='text-align:center;color:#475569;font-size:0.8rem;margin-top:1rem;'>" | |
| "WAN 2.1 Dataset Creator β’ HuggingFace Spaces Edition β’ video pairs β safetensors" | |
| "</div>" | |
| ) | |
| return blocks | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAUNCH (HuggingFace Spaces β no share=True needed) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| app = build_ui() | |
| app.queue() | |
| app.launch(share=True) | |