# app.py # A lightweight, user-friendly FFmpeg UI for extracting video frames on Hugging Face Spaces. # - Works on CPU (no GPU required) # - Shows the exact ffmpeg command used # - Lets you extract every N seconds, every Nth frame, or at an exact FPS # - Optional start/end time trims, resize, JPG quality / PNG compression, scene-change detection # - Returns a ZIP of frames and a gallery preview import os import re import io import sys import json import math import time import shutil import zipfile import tempfile import subprocess from pathlib import Path from typing import List, Tuple, Optional import gradio as gr # ───────────────────────────────────────────────────────────── # Utility: check for ffmpeg/ffprobe availability # ───────────────────────────────────────────────────────────── def _which(name: str) -> Optional[str]: from shutil import which return which(name) FFMPEG = _which("ffmpeg") FFPROBE = _which("ffprobe") if not FFMPEG or not FFPROBE: # Friendly message shown in the UI footer if ffmpeg is missing MISSING_MSG = ( "⚠️ FFmpeg not found. On Hugging Face Spaces, add a file named 'packages.txt' " "with a single line 'ffmpeg' (and optionally 'libsm6' 'libxext6'). Then restart the Space." ) else: MISSING_MSG = "" # ───────────────────────────────────────────────────────────── # Video probing via ffprobe # ───────────────────────────────────────────────────────────── def ffprobe_json(input_path: str) -> dict: if not FFPROBE: return {} cmd = [ FFPROBE, "-v", "error", "-print_format", "json", "-show_streams", "-show_format", input_path, ] res = subprocess.run(cmd, capture_output=True, text=True) if res.returncode != 0: return {} try: return json.loads(res.stdout) except Exception: return {} def parse_video_info(meta: dict) -> dict: info = {"duration": None, "fps": None, "width": None, "height": None, "codec": None} if not meta: return info # Duration from format try: info["duration"] = float(meta.get("format", {}).get("duration", None)) except Exception: pass # Find the first video stream vstreams = [s for s in meta.get("streams", []) if s.get("codec_type") == "video"] if vstreams: v = vstreams[0] info["codec"] = v.get("codec_name") info["width"] = v.get("width") info["height"] = v.get("height") # FPS from r_frame_rate rfr = v.get("r_frame_rate") or v.get("avg_frame_rate") if rfr and "/" in rfr: num, den = rfr.split("/") try: num = float(num) den = float(den) if den != 0: info["fps"] = num / den except Exception: pass return info # ───────────────────────────────────────────────────────────── # FFmpeg command builder # ───────────────────────────────────────────────────────────── def build_ffmpeg_command( input_path: str, mode: str, every_seconds: float, nth_frame: int, exact_fps: float, start_time: str, end_time: str, long_side: int, out_format: str, jpg_quality: int, png_level: int, scene_detect: bool, scene_thresh: float, out_pattern: str, ) -> List[str]: """Return a full ffmpeg command list for subprocess.run.""" if not FFMPEG: raise RuntimeError("FFmpeg is not available on this system.") cmd = [FFMPEG, "-y"] # Optional in/out trims if start_time: cmd += ["-ss", start_time] cmd += ["-i", input_path] if end_time: # Use -to for end timestamp relative to input start cmd += ["-to", end_time] # Build filter chain vf_parts = [] # 1) Frame selection / rate if mode == "Every N seconds": # fps=1/seconds rate = 1.0 / max(every_seconds, 0.000001) vf_parts.append(f"fps={rate}") elif mode == "Every Nth frame": # select='not(mod(n\,N))' -> then set fps to input fps to avoid duplicating vf_parts.append(f"select='not(mod(n,{max(nth_frame,1)}))'") vf_parts.append("setpts=N/FRAME_RATE/TB") elif mode == "Exact FPS": vf_parts.append(f"fps={max(exact_fps, 0.000001)}") elif mode == "All frames": # No explicit fps filter — pass all frames pass else: vf_parts.append("fps=1") # 2) Scene change detection (grabs frames when scene changes by threshold) if scene_detect: # Use select filter: 'gt(scene,THRESH)' outputs only scene-change frames vf_parts.append(f"select='gt(scene,{scene_thresh})',showinfo") vf_parts.append("setpts=N/FRAME_RATE/TB") # 3) Resize by long side if long_side and long_side > 0: # Maintain aspect: scale=LONG:-1 sets height auto; but we need to pick which side is longer # Use force_original_aspect_ratio=decrease and -1 for one dim with eval # To keep the *long* side at long_side, we can use scale logic with if(gt(iw,ih),...) vf_parts.append( f"scale='if(gt(iw,ih),{long_side},-1)':'if(gt(iw,ih),-1,{long_side})':force_original_aspect_ratio=decrease" ) # Join vf if vf_parts: cmd += ["-vf", ",".join(vf_parts)] # 4) Output options per format ext = out_format.lower() if ext == "jpg": # Lower -q:v is higher quality. Map slider (2..31) directly cmd += ["-q:v", str(jpg_quality)] elif ext == "png": # Compression level 0..9 cmd += ["-compression_level", str(png_level)] # Avoid timestamps gaps in patterns cmd += ["-frame_pts", "1"] # Output pattern cmd += [out_pattern] return cmd # ───────────────────────────────────────────────────────────── # Extraction runtime # ───────────────────────────────────────────────────────────── def extract_frames( video: gr.File | None, mode: str, every_seconds: float, nth_frame: int, exact_fps: float, start_time: str, end_time: str, long_side: int, out_format: str, jpg_quality: int, png_level: int, scene_detect: bool, scene_thresh: float, prefix: str, progress=gr.Progress(track_tqdm=True), ): if not video or not video.name: return None, None, "Please upload a video.", "" if not FFMPEG or not FFPROBE: return None, None, "FFmpeg is not available. See the note below.", MISSING_MSG # Probe video info meta = ffprobe_json(video.name) info = parse_video_info(meta) # Prepare temp dir work = Path(tempfile.mkdtemp(prefix="frames_")) out_dir = work / "frames" out_dir.mkdir(parents=True, exist_ok=True) # Output pattern ext = out_format.lower() pattern = str(out_dir / f"{prefix}_%05d.{ext}") # Build command cmd = build_ffmpeg_command( input_path=video.name, mode=mode, every_seconds=every_seconds, nth_frame=nth_frame, exact_fps=exact_fps, start_time=start_time.strip(), end_time=end_time.strip(), long_side=long_side, out_format=ext, jpg_quality=jpg_quality, png_level=png_level, scene_detect=scene_detect, scene_thresh=scene_thresh, out_pattern=pattern, ) # Friendly command preview command_preview = " ".join([sh if " " not in sh else f'"{sh}"' for sh in cmd]) # Estimate total frames for progress (best-effort) total = None try: duration = info.get("duration") in_fps = info.get("fps") or 30 if start_time: # Roughly convert HH:MM:SS.mmm to seconds parts = [float(x) for x in re.split(r"[:]", start_time)] if len(parts) == 3: duration = max(0.0, (duration or 0) - (parts[0]*3600 + parts[1]*60 + parts[2])) if end_time: parts = [float(x) for x in re.split(r"[:]", end_time)] if len(parts) == 3: duration = min(duration or 0, (parts[0]*3600 + parts[1]*60 + parts[2])) if duration: if mode == "Every N seconds" and every_seconds > 0: total = int(math.ceil(duration / every_seconds)) elif mode == "Every Nth frame" and in_fps and nth_frame > 0: total = int(math.ceil((duration * in_fps) / nth_frame)) elif mode == "Exact FPS" and exact_fps > 0: total = int(math.ceil(duration * exact_fps)) elif mode == "All frames" and in_fps: total = int(math.ceil(duration * in_fps)) except Exception: total = None # Run ffmpeg and stream stderr for incremental progress by counting files created proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, text=True, bufsize=1) created = 0 last_update = time.time() while True: line = proc.stderr.readline() if not line and proc.poll() is not None: break # Periodically refresh progress based on files present if time.time() - last_update > 0.2: created = len(list(out_dir.glob(f"{prefix}_*.{ext}"))) if total: progress(created / max(total, 1)) last_update = time.time() ret = proc.wait() # Final count frame_files = sorted(out_dir.glob(f"{prefix}_*.{ext}")) created = len(frame_files) if ret != 0 or created == 0: # Read remaining stderr to show message try: err_rest = proc.stderr.read() if proc.stderr else "" except Exception: err_rest = "" return None, None, f"FFmpeg failed or produced no frames.\n\nStderr:\n{err_rest}", command_preview # Build a small gallery (cap to avoid huge RAM) gallery_cap = 60 gallery_paths = [str(p) for p in frame_files[:gallery_cap]] # Zip everything zip_path = work / f"{prefix}_frames.zip" with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for p in frame_files: zf.write(p, p.name) # Info text info_lines = [] if info.get("fps"): info_lines.append(f"Input FPS: {info['fps']:.3f}") if info.get("duration"): info_lines.append(f"Duration: {info['duration']:.2f}s") if info.get("width") and info.get("height"): info_lines.append(f"Resolution: {info['width']}×{info['height']}") info_lines.append(f"Frames extracted: {created}") details = "\n".join(info_lines) return gallery_paths, str(zip_path), details, command_preview # ───────────────────────────────────────────────────────────── # UI # ───────────────────────────────────────────────────────────── def build_ui(): with gr.Blocks(theme=gr.themes.Soft(), css=""" .cf-title { font-size: 1.6rem; font-weight: 800; } .cf-sub { opacity: .8; } .cmdbox textarea { font-family: ui-monospace, SFMono-Regular, Menlo, monospace; font-size: 12px; } """) as demo: gr.Markdown("""