Dataset-auto / app.py
varunm2004's picture
Update app.py
ed9dbbe verified
"""
app.py β€” WAN 2.1 Dataset Creator (HuggingFace Spaces Edition)
Gradio-powered UI for preparing video + caption datasets for WAN 2.1 LoRA training.
Tabs:
1. 🎬 Video Ingest β€” Upload, trim, validate source videos
2. ✍️ Caption Studio β€” Write / template-build captions per clip
3. βœ… Validator β€” Check pairs, naming, frame counts
4. πŸ“¦ Export & Handoff β€” Final dataset summary + zip download
Differences from Colab version:
- No Google Drive sync (not available on HF Spaces)
- Export produces a downloadable .zip instead of Drive copy
- FFmpeg auto-installed at startup if not present
- All paths relative to /tmp/dataset_builder (HF writable space)
- share=False, server_name="0.0.0.0" for Spaces compatibility
"""
import os
import sys
import glob
import json
import shutil
import warnings
import subprocess
import tempfile
import zipfile
from pathlib import Path
from datetime import datetime
# ── Auto-install FFmpeg on HuggingFace Spaces ─────────────────────────────────
def _ensure_ffmpeg():
try:
subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True)
except (FileNotFoundError, subprocess.CalledProcessError):
print("βš™οΈ FFmpeg not found β€” installing via apt-get...")
subprocess.run(["apt-get", "update", "-qq"], capture_output=True)
subprocess.run(["apt-get", "install", "-y", "-qq", "ffmpeg"], capture_output=True)
print("βœ… FFmpeg installed.")
_ensure_ffmpeg()
import gradio as gr
# ── Path Setup (HF Spaces uses /tmp for writable storage) ─────────────────────
BASE_DIR = "/tmp/dataset_builder"
VIDEO_DIR = os.path.join(BASE_DIR, "videos")
CAPTION_DIR = os.path.join(BASE_DIR, "captions")
EXPORT_DIR = os.path.join(BASE_DIR, "exports")
for d in [VIDEO_DIR, CAPTION_DIR, EXPORT_DIR]:
os.makedirs(d, exist_ok=True)
# ═════════════════════════════════════════════════════════════════════════════
# HELPERS
# ═════════════════════════════════════════════════════════════════════════════
def _probe_video(path: str) -> dict:
"""Use ffprobe to get video metadata."""
try:
cmd = [
"ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", "-show_format", path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
data = json.loads(result.stdout)
vstream = next((s for s in data.get("streams", []) if s.get("codec_type") == "video"), {})
duration = float(data.get("format", {}).get("duration", 0))
fps_raw = vstream.get("r_frame_rate", "0/1")
num, den = fps_raw.split("/")
fps = round(float(num) / float(den), 2) if float(den) else 0
w = int(vstream.get("width", 0))
h = int(vstream.get("height", 0))
frames = int(vstream.get("nb_frames", 0)) or int(duration * fps)
return {"duration": round(duration, 2), "fps": fps, "width": w,
"height": h, "frames": frames, "ok": True}
except Exception as e:
return {"duration": 0, "fps": 0, "width": 0, "height": 0,
"frames": 0, "ok": False, "error": str(e)}
def _sanitize_name(name: str) -> str:
"""Lowercase, replace non-alphanumeric chars with underscores."""
import re
name = os.path.splitext(name)[0]
name = name.lower()
name = re.sub(r"[^a-z0-9_]", "_", name)
name = re.sub(r"_+", "_", name).strip("_")
return name
def _get_all_videos() -> list:
"""Return list of dicts for every video in VIDEO_DIR."""
videos = []
for f in sorted(glob.glob(os.path.join(VIDEO_DIR, "*.mp4"))):
stem = Path(f).stem
cap_path = os.path.join(CAPTION_DIR, f"{stem}.txt")
caption = open(cap_path).read().strip() if os.path.exists(cap_path) else ""
meta = _probe_video(f)
videos.append({
"stem": stem,
"video_path": f,
"caption_path": cap_path,
"has_caption": os.path.exists(cap_path),
"caption": caption,
**meta,
})
return videos
def _validation_issues(v: dict) -> list:
issues = []
if not v["has_caption"]: issues.append("❌ Missing caption file")
if v["duration"] < 1: issues.append("⚠️ Duration < 1s (too short)")
if v["duration"] > 10: issues.append("⚠️ Duration > 10s (trim recommended)")
if v["frames"] < 8: issues.append("❌ Fewer than 8 frames")
if v["fps"] < 18: issues.append("⚠️ Low FPS (< 18)")
if v["width"] < 640: issues.append("⚠️ Resolution below 720p")
if v["caption"] and len(v["caption"]) < 20:
issues.append("⚠️ Caption very short (< 20 chars)")
return issues
# ═════════════════════════════════════════════════════════════════════════════
# TAB 1 β€” VIDEO INGEST
# ═════════════════════════════════════════════════════════════════════════════
def ingest_videos(files):
if not files:
return "No files selected.", video_gallery_md()
log = []
for f in files:
raw_name = os.path.basename(f.name)
stem = _sanitize_name(raw_name)
dest_name = f"{stem}.mp4"
dest = os.path.join(VIDEO_DIR, dest_name)
shutil.copy(f.name, dest)
meta = _probe_video(dest)
if meta["ok"]:
warns = []
if meta["duration"] > 10: warns.append(f"duration {meta['duration']}s > 10s")
if meta["frames"] < 8: warns.append(f"only {meta['frames']} frames")
warn_str = f" ⚠️ {', '.join(warns)}" if warns else ""
log.append(
f"βœ… {dest_name} β€” {meta['duration']}s | "
f"{meta['fps']}fps | {meta['width']}Γ—{meta['height']}{warn_str}"
)
else:
log.append(f"⚠️ {dest_name} β€” saved (ffprobe unavailable, verify manually)")
return "\n".join(log), video_gallery_md()
def trim_video(source_path, start_time, end_time, output_stem):
if not source_path:
return "❌ No source file path provided.", video_gallery_md()
stem = _sanitize_name(output_stem) if output_stem.strip() else _sanitize_name(Path(source_path).stem) + "_trimmed"
dest = os.path.join(VIDEO_DIR, f"{stem}.mp4")
try:
cmd = [
"ffmpeg", "-y", "-i", source_path,
"-ss", str(start_time), "-to", str(end_time),
"-c:v", "libx264", "-c:a", "aac", dest
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
return f"❌ FFmpeg error:\n{result.stderr[-500:]}", video_gallery_md()
meta = _probe_video(dest)
return (
f"βœ… Trimmed β†’ {stem}.mp4\n"
f" Duration: {meta['duration']}s | FPS: {meta['fps']} | "
f"{meta['width']}Γ—{meta['height']} | Frames: {meta['frames']}"
), video_gallery_md()
except Exception as e:
return f"❌ Error: {e}", video_gallery_md()
def delete_video(stem):
stem = stem.strip()
if not stem:
return "❌ No stem provided.", video_gallery_md()
msgs = []
for ext, folder in [(".mp4", VIDEO_DIR), (".txt", CAPTION_DIR)]:
path = os.path.join(folder, f"{stem}{ext}")
if os.path.exists(path):
os.remove(path)
msgs.append(f"πŸ—‘οΈ Deleted {stem}{ext}")
if not msgs:
msgs.append(f"⚠️ No files found for stem: '{stem}'")
return "\n".join(msgs), video_gallery_md()
def video_gallery_md() -> str:
videos = _get_all_videos()
if not videos:
return "πŸ“­ No videos yet. Upload `.mp4` files above."
rows = []
for v in videos:
cap_icon = "βœ…" if v["has_caption"] else "⚠️"
dur = f"{v['duration']}s" if v["duration"] else "?"
fps = f"{v['fps']}fps" if v["fps"] else "?"
res = f"{v['width']}Γ—{v['height']}" if v["width"] else "?"
rows.append(f"| `{v['stem']}` | {dur} | {fps} | {res} | {cap_icon} |")
header = (
f"### 🎬 {len(videos)} Video(s) in Dataset\n"
"| Stem | Duration | FPS | Resolution | Caption |\n"
"|------|----------|-----|------------|---------|"
)
return header + "\n" + "\n".join(rows)
def get_video_stems():
return [Path(f).stem for f in sorted(glob.glob(os.path.join(VIDEO_DIR, "*.mp4")))]
# ═════════════════════════════════════════════════════════════════════════════
# TAB 2 β€” CAPTION STUDIO
# ═════════════════════════════════════════════════════════════════════════════
def load_caption_for_stem(stem):
if not stem:
return "", "Select a video above."
cap_path = os.path.join(CAPTION_DIR, f"{stem}.txt")
if os.path.exists(cap_path):
return open(cap_path).read(), f"πŸ“‚ Loaded caption for `{stem}`"
return "", f"πŸ“­ No caption yet for `{stem}` β€” write one and save."
def save_caption(stem, caption_text):
if not stem:
return "❌ No video selected.", caption_summary_md()
if not caption_text.strip():
return "❌ Caption is empty.", caption_summary_md()
cap_path = os.path.join(CAPTION_DIR, f"{stem}.txt")
with open(cap_path, "w", encoding="utf-8") as f:
f.write(caption_text.strip())
return f"βœ… Saved caption for `{stem}`", caption_summary_md()
def build_caption_from_template(subject, action, environment, lighting, camera):
parts = [p.strip() for p in [subject, action, environment] if p.strip()]
s1 = ("A " + " ".join(parts) + ".") if parts else ""
s2 = (lighting.strip() + ".") if lighting.strip() else ""
s3 = (camera.strip() + ".") if camera.strip() else ""
return " ".join(s for s in [s1, s2, s3] if s)
def caption_summary_md() -> str:
videos = _get_all_videos()
if not videos:
return "πŸ“­ No videos loaded yet."
rows = []
for v in videos:
if v["has_caption"] and v["caption"]:
preview = v["caption"][:60].replace("\n", " ")
preview += "…" if len(v["caption"]) > 60 else ""
rows.append(f"| `{v['stem']}` | βœ… | {preview} |")
else:
rows.append(f"| `{v['stem']}` | ⚠️ Missing | β€” |")
paired = sum(1 for v in videos if v["has_caption"] and v["caption"])
header = (
f"### ✍️ Caption Status β€” {paired}/{len(videos)} complete\n"
"| Video | Status | Preview |\n|-------|--------|---------|"
)
return header + "\n" + "\n".join(rows)
def generate_bulk_template():
videos = _get_all_videos()
if not videos:
return "πŸ“­ No videos loaded."
lines = []
for v in videos:
lines.append(f"--- {v['stem']}")
lines.append(v["caption"] if v["caption"] else
"A [subject] [action] [environment]. [lighting]. [camera shot].")
lines.append("")
return "\n".join(lines)
def save_all_bulk_captions(bulk_text: str):
if not bulk_text.strip():
return "❌ No text provided.", caption_summary_md()
saved, current_stem, current_lines = [], None, []
for line in bulk_text.splitlines():
if line.startswith("---"):
if current_stem and current_lines:
cap_path = os.path.join(CAPTION_DIR, f"{current_stem}.txt")
with open(cap_path, "w") as f:
f.write("\n".join(current_lines).strip())
saved.append(current_stem)
current_stem = line.lstrip("- ").strip()
current_lines = []
elif current_stem is not None:
current_lines.append(line)
if current_stem and current_lines:
cap_path = os.path.join(CAPTION_DIR, f"{current_stem}.txt")
with open(cap_path, "w") as f:
f.write("\n".join(current_lines).strip())
saved.append(current_stem)
return f"βœ… Saved {len(saved)} caption(s): {', '.join(saved)}", caption_summary_md()
# ═════════════════════════════════════════════════════════════════════════════
# TAB 3 β€” VALIDATOR
# ═════════════════════════════════════════════════════════════════════════════
def run_full_validation():
videos = _get_all_videos()
if not videos:
return "πŸ“­ No videos to validate. Upload files in the Video Ingest tab.", ""
all_ok, has_warn, has_err = [], [], []
detail_rows = []
for v in videos:
issues = _validation_issues(v)
errors = [i for i in issues if i.startswith("❌")]
warns = [i for i in issues if i.startswith("⚠️")]
if errors:
has_err.append(v["stem"]); status = "❌ Error"
elif warns:
has_warn.append(v["stem"]); status = "⚠️ Warning"
else:
all_ok.append(v["stem"]); status = "βœ… Ready"
issue_str = " | ".join(issues) if issues else "β€”"
detail_rows.append(
f"| `{v['stem']}` | {v['duration']}s | {v['frames']} | {status} | {issue_str} |"
)
summary = (
f"### Validation Complete β€” {len(videos)} video(s)\n\n"
f"βœ… **Ready:** {len(all_ok)} | "
f"⚠️ **Warnings:** {len(has_warn)} | "
f"❌ **Errors:** {len(has_err)}\n\n"
)
if has_err:
summary += f"**Must fix before export:** {', '.join(f'`{s}`' for s in has_err)}\n\n"
if has_warn:
summary += f"**Review recommended:** {', '.join(f'`{s}`' for s in has_warn)}\n\n"
if not has_err and not has_warn:
summary += "πŸŽ‰ **All clips are ready to export!**\n\n"
header = (
"| Video | Duration | Frames | Status | Issues |\n"
"|-------|----------|--------|--------|--------|"
)
detail = header + "\n" + "\n".join(detail_rows)
return summary, detail
def naming_check_report():
videos = _get_all_videos()
if not videos:
return "πŸ“­ No videos loaded."
import re
issues = []
for v in videos:
stem = v["stem"]
if re.search(r"[^a-z0-9_]", stem):
issues.append(f"⚠️ `{stem}` β€” invalid characters (use a-z, 0-9, _ only)")
if stem != stem.lower():
issues.append(f"⚠️ `{stem}` β€” contains uppercase")
return "\n".join(issues) if issues else "βœ… All filenames valid."
# ═════════════════════════════════════════════════════════════════════════════
# TAB 4 β€” EXPORT & DOWNLOAD
# ═════════════════════════════════════════════════════════════════════════════
def dataset_summary_md() -> str:
videos = _get_all_videos()
if not videos:
return "πŸ“­ No dataset yet."
paired = sum(1 for v in videos if v["has_caption"])
total_dur = sum(v["duration"] for v in videos)
ready = sum(1 for v in videos if not _validation_issues(v))
return f"""### πŸ“‹ Dataset Summary
| Metric | Value |
|--------|-------|
| Total videos | {len(videos)} |
| Captioned | {paired} / {len(videos)} |
| Ready to encode | {ready} / {len(videos)} |
| Total duration | {total_dur:.1f}s ({total_dur/60:.1f} min) |
### Quick Checklist
- {"βœ…" if len(videos) >= 10 else "⚠️"} 10–20 clips (`{len(videos)}` loaded)
- {"βœ…" if all(v["duration"] >= 2 for v in videos) else "⚠️"} All clips β‰₯ 2 seconds
- {"βœ…" if all(v["duration"] <= 10 for v in videos) else "⚠️"} All clips ≀ 10 seconds
- {"βœ…" if all(v["frames"] >= 8 for v in videos) else "❌"} All clips have β‰₯ 8 frames
- {"βœ…" if paired == len(videos) else "❌"} All videos have captions
- {"βœ…" if ready == len(videos) else "⚠️"} No validation errors
"""
def export_dataset_zip():
videos = _get_all_videos()
if not videos:
return "❌ No videos to export.", None, dataset_summary_md()
fatal = [
v for v in videos
if any(i.startswith("❌") for i in _validation_issues(v))
]
if fatal:
stems = ", ".join(f"`{v['stem']}`" for v in fatal)
return f"❌ Fix errors first: {stems}", None, dataset_summary_md()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_path = os.path.join(EXPORT_DIR, f"wan21_dataset_{timestamp}.zip")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for v in videos:
zf.write(v["video_path"], arcname=f"{v['stem']}.mp4")
if v["has_caption"]:
zf.write(v["caption_path"], arcname=f"{v['stem']}.txt")
size_mb = os.path.getsize(zip_path) / (1024 * 1024)
msg = (
f"βœ… Exported {len(videos)} pairs β†’ `wan21_dataset_{timestamp}.zip` "
f"({size_mb:.1f} MB)\n\n"
f"Click **Download ZIP** below to save it."
)
return msg, zip_path, dataset_summary_md()
# ═════════════════════════════════════════════════════════════════════════════
# THEME & CSS
# ═════════════════════════════════════════════════════════════════════════════
THEME = gr.themes.Base(
primary_hue=gr.themes.colors.violet,
secondary_hue=gr.themes.colors.purple,
neutral_hue=gr.themes.colors.slate,
font=gr.themes.GoogleFont("Inter"),
).set(
body_background_fill="#0a0a0f",
body_background_fill_dark="#0a0a0f",
block_background_fill="#12121a",
block_background_fill_dark="#12121a",
block_border_color="#1e1e2e",
block_border_color_dark="#1e1e2e",
block_label_text_color="#a78bfa",
block_label_text_color_dark="#a78bfa",
block_title_text_color="#e2e8f0",
block_title_text_color_dark="#e2e8f0",
body_text_color="#cbd5e1",
body_text_color_dark="#cbd5e1",
button_primary_background_fill="#7c3aed",
button_primary_background_fill_dark="#7c3aed",
button_primary_background_fill_hover="#6d28d9",
button_primary_background_fill_hover_dark="#6d28d9",
button_primary_text_color="#ffffff",
button_primary_text_color_dark="#ffffff",
button_secondary_background_fill="#1e1e2e",
button_secondary_background_fill_dark="#1e1e2e",
button_secondary_text_color="#a78bfa",
button_secondary_text_color_dark="#a78bfa",
input_background_fill="#1a1a2e",
input_background_fill_dark="#1a1a2e",
input_border_color="#2d2d44",
input_border_color_dark="#2d2d44",
shadow_drop="0 4px 14px rgba(124, 58, 237, 0.08)",
shadow_drop_lg="0 8px 24px rgba(124, 58, 237, 0.12)",
)
CSS = """
.gradio-container { max-width: 980px !important; margin: auto; }
.main-title {
text-align: center;
background: linear-gradient(135deg, #7c3aed 0%, #a78bfa 50%, #c4b5fd 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
font-size: 2.1rem;
font-weight: 800;
margin-bottom: 0.15rem;
letter-spacing: -0.5px;
}
.sub-title {
text-align: center;
color: #64748b;
font-size: 0.92rem;
margin-bottom: 1rem;
}
.status-bar {
padding: 10px 16px;
background: linear-gradient(135deg, #1a1a2e, #16162a);
border: 1px solid #2d2d44;
border-radius: 8px;
font-size: 0.9rem;
}
.tip-box {
background: #13131f;
border-left: 3px solid #7c3aed;
border-radius: 0 8px 8px 0;
padding: 10px 14px;
margin: 6px 0;
font-size: 0.88rem;
color: #94a3b8;
}
"""
# ═════════════════════════════════════════════════════════════════════════════
# UI ASSEMBLY
# ═════════════════════════════════════════════════════════════════════════════
def build_ui():
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
blocks = gr.Blocks(theme=THEME, css=CSS, title="WAN 2.1 Dataset Creator")
with blocks:
gr.HTML("<div class='main-title'>🎬 WAN 2.1 Dataset Creator</div>")
gr.HTML("<div class='sub-title'>Prepare Β· Caption Β· Validate Β· Export β†’ WAN 2.1 Latent Cacher</div>")
gr.Markdown(value=dataset_summary_md, elem_classes=["status-bar"])
# ── TAB 1: VIDEO INGEST ───────────────────────────────────────────
with gr.Tab("🎬 Video Ingest", id="ingest"):
gr.Markdown(
"### Step 1 & 2 β€” Upload & Trim Source Clips\n"
"Upload raw `.mp4` files. Filenames are auto-sanitised to `lowercase_underscore`."
)
gr.HTML("<div class='tip-box'>πŸ’‘ <b>Sweet spot:</b> 3–5 second clips at 720p+, 24–30fps. "
"Aim for 10–20 clips per LoRA concept.</div>")
gr.Markdown("#### πŸ“€ Upload Videos")
with gr.Row():
upload_files = gr.File(
label="Drop .mp4 files here",
file_count="multiple",
file_types=[".mp4"],
scale=3,
)
upload_btn = gr.Button("⬆️ Ingest Files", variant="primary", scale=1)
ingest_log = gr.Textbox(label="Ingest Log", lines=5, interactive=False)
gr.Markdown("---\n#### βœ‚οΈ Trim a Clip with FFmpeg")
with gr.Row():
trim_source = gr.Textbox(
label="Source path (full path in /tmp/dataset_builder/videos/)",
placeholder="/tmp/dataset_builder/videos/raw_footage.mp4",
scale=3,
)
trim_stem = gr.Textbox(label="Output stem name", placeholder="clip_01", scale=1)
with gr.Row():
trim_start = gr.Number(label="Start (seconds)", value=0, minimum=0)
trim_end = gr.Number(label="End (seconds)", value=5, minimum=0)
trim_btn = gr.Button("βœ‚οΈ Trim & Save", variant="primary")
trim_log = gr.Textbox(label="Trim Log", lines=3, interactive=False)
gr.Markdown("---\n#### πŸ—‘οΈ Remove a Clip")
with gr.Row():
del_stem = gr.Textbox(label="Stem to delete", placeholder="clip_01", scale=3)
del_btn = gr.Button("πŸ—‘οΈ Delete", variant="secondary", scale=1)
del_log = gr.Textbox(label="Delete Log", lines=2, interactive=False)
gr.Markdown("---")
refresh_gallery_btn = gr.Button("πŸ”„ Refresh Gallery", variant="secondary")
gallery_md = gr.Markdown(value=video_gallery_md)
upload_btn.click(fn=ingest_videos, inputs=[upload_files], outputs=[ingest_log, gallery_md])
upload_files.upload(fn=ingest_videos, inputs=[upload_files], outputs=[ingest_log, gallery_md])
trim_btn.click(fn=trim_video, inputs=[trim_source, trim_start, trim_end, trim_stem], outputs=[trim_log, gallery_md])
del_btn.click(fn=delete_video, inputs=[del_stem], outputs=[del_log, gallery_md])
refresh_gallery_btn.click(fn=video_gallery_md, outputs=[gallery_md])
# ── TAB 2: CAPTION STUDIO ─────────────────────────────────────────
with gr.Tab("✍️ Caption Studio", id="captions"):
gr.Markdown(
"### Step 3 β€” Write Captions\n"
"Every `.mp4` needs a matching `.txt` caption describing subject, action, "
"environment, lighting, and camera."
)
gr.HTML("<div class='tip-box'>πŸ’‘ Good captions: <b>subject + action + environment + "
"lighting + camera</b>. 1–3 sentences. Specific and consistent.</div>")
with gr.Tabs():
with gr.Tab("πŸ–ŠοΈ Per-Clip Editor"):
with gr.Row():
stem_dropdown = gr.Dropdown(
label="Select Video",
choices=get_video_stems(),
scale=3,
)
refresh_stems_btn = gr.Button("πŸ”„", scale=1, variant="secondary")
load_status = gr.Markdown("")
caption_box = gr.Textbox(
label="Caption Text",
lines=5,
placeholder=(
"A fluffy orange tabby cat playing with a red ball on a wooden floor.\n"
"Warm natural sunlight streams through a window.\n"
"Low angle shot with shallow depth of field."
),
)
gr.Markdown("#### 🧩 Template Builder")
gr.HTML("<div class='tip-box'>πŸ’‘ Fill the fields and click Build to generate a caption draft, "
"then edit it freely before saving.</div>")
with gr.Row():
t_subject = gr.Textbox(label="Subject", placeholder="fluffy orange tabby cat")
t_action = gr.Textbox(label="Action", placeholder="playing with a red ball")
with gr.Row():
t_env = gr.Textbox(label="Environment", placeholder="on a wooden floor in a living room")
t_lighting = gr.Textbox(label="Lighting", placeholder="warm natural sunlight from a window")
t_camera = gr.Textbox(label="Camera / Shot", placeholder="low angle shot, shallow depth of field")
with gr.Row():
build_btn = gr.Button("🧩 Build from Template", variant="secondary")
save_btn = gr.Button("πŸ’Ύ Save Caption", variant="primary")
save_status = gr.Markdown("")
cap_summary = gr.Markdown(value=caption_summary_md)
stem_dropdown.change(
fn=load_caption_for_stem,
inputs=[stem_dropdown],
outputs=[caption_box, load_status],
)
refresh_stems_btn.click(
fn=lambda: gr.Dropdown(choices=get_video_stems()),
outputs=[stem_dropdown],
)
build_btn.click(
fn=build_caption_from_template,
inputs=[t_subject, t_action, t_env, t_lighting, t_camera],
outputs=[caption_box],
)
save_btn.click(
fn=save_caption,
inputs=[stem_dropdown, caption_box],
outputs=[save_status, cap_summary],
)
with gr.Tab("πŸ“ Bulk Editor"):
gr.Markdown(
"Edit all captions at once. Format:\n"
"```\n--- stem_name\nYour caption text here.\n\n--- next_stem\n...\n```\n"
"Click **Load Template** to pre-fill existing captions."
)
with gr.Row():
load_bulk_btn = gr.Button("πŸ“‚ Load Template", variant="secondary")
save_bulk_btn = gr.Button("πŸ’Ύ Save All", variant="primary")
bulk_box = gr.Textbox(label="Bulk Caption Editor", lines=22,
placeholder="Click 'Load Template' to start…")
bulk_status = gr.Markdown("")
bulk_summary = gr.Markdown(value=caption_summary_md)
load_bulk_btn.click(fn=generate_bulk_template, outputs=[bulk_box])
save_bulk_btn.click(
fn=save_all_bulk_captions,
inputs=[bulk_box],
outputs=[bulk_status, bulk_summary],
)
# ── TAB 3: VALIDATOR ──────────────────────────────────────────────
with gr.Tab("βœ… Validator", id="validate"):
gr.Markdown(
"### Step 4 β€” Validate Your Dataset\n"
"Check every clip for duration, frame count, resolution, naming, and captions."
)
with gr.Row():
validate_btn = gr.Button("β–Ά Run Full Validation", variant="primary", scale=2)
naming_btn = gr.Button("πŸ”€ Check Naming Only", variant="secondary", scale=1)
validation_summary = gr.Markdown("")
naming_report = gr.Markdown("")
validation_detail = gr.Markdown("")
gr.Markdown("---\n#### πŸ“ Requirements Reference")
gr.Markdown("""
| Parameter | Requirement |
|-----------|-------------|
| Format | `.mp4` only |
| Duration | 2–10 seconds (sweet spot: 3–5s) |
| FPS | 24–30 fps recommended |
| Resolution | 720p+ (auto-resized to 480Γ—832) |
| Min frames | 8 frames minimum |
| Caption | Required, 1–3 sentences |
| Filename | Lowercase, underscores, no spaces |
""")
validate_btn.click(fn=run_full_validation, outputs=[validation_summary, validation_detail])
naming_btn.click(fn=naming_check_report, outputs=[naming_report])
# ── TAB 4: EXPORT ─────────────────────────────────────────────────
with gr.Tab("πŸ“¦ Export & Download", id="export"):
gr.Markdown(
"### Step 5 β€” Export Dataset\n"
"Packages all validated video + caption pairs into a single `.zip` for download."
)
gr.Markdown(value=dataset_summary_md, label="Dataset Summary")
gr.HTML("<div class='tip-box'>πŸ’‘ Fix all ❌ validation errors before exporting. "
"⚠️ warnings are safe to ignore.</div>")
export_btn = gr.Button("πŸ“¦ Build & Download ZIP", variant="primary", size="lg")
export_status = gr.Markdown("")
download_file = gr.File(label="⬇️ Download ZIP", visible=True)
export_summary = gr.Markdown(value=dataset_summary_md)
gr.Markdown("---\n#### πŸ“‹ Dataset Checklist")
gr.Markdown("""
```
DATASET CHECKLIST
─────────────────────────────────────────
β–‘ 10–20 clips, each 3–5 seconds
β–‘ All .mp4 format, 720p+, 24–30 fps
β–‘ Matching .txt caption for EVERY video
β–‘ Filenames: lowercase, underscores, no spaces
β–‘ Captions: 1–3 sentences β€”
subject Β· action Β· environment Β· lighting Β· camera
β–‘ No watermarks, black frames, or blurry footage
β–‘ All pairs validated βœ… in Validator tab
```
""")
gr.Markdown("---\n#### ⚠️ Common Issues")
gr.Markdown("""
| Problem | Solution |
|---------|----------|
| "Missing caption" | Create `.txt` with exact same stem as `.mp4` |
| "Only N frames, need 8" | Clip too short β€” use β‰₯ 1 second at 24fps |
| Blurry latent outputs | Source too low-res β€” use 720p+ footage |
| LoRA overfits | More variety β€” different angles, lighting, backgrounds |
| LoRA doesn't learn concept | Captions too vague β€” be more specific |
""")
export_btn.click(
fn=export_dataset_zip,
outputs=[export_status, download_file, export_summary],
)
# ── FOOTER ────────────────────────────────────────────────────────
gr.HTML(
"<div style='text-align:center;color:#475569;font-size:0.8rem;margin-top:1rem;'>"
"WAN 2.1 Dataset Creator β€’ HuggingFace Spaces Edition β€’ video pairs β†’ safetensors"
"</div>"
)
return blocks
# ═════════════════════════════════════════════════════════════════════════════
# LAUNCH (HuggingFace Spaces β€” no share=True needed)
# ═════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
app = build_ui()
app.queue()
app.launch(share=True)