# Bifröst · Video → Frames Extractor # ──────────────────────────────────────────────────────── # Standard imports # ──────────────────────────────────────────────────────── import os, re, json, math, time, zipfile, tempfile, subprocess, base64 from pathlib import Path from typing import List, Optional import gradio as gr _num = re.compile(r'(\d+)') def _natural_key(p: Path | str): s = str(p) return [int(t) if t.isdigit() else t.lower() for t in _num.split(s)] def sample_paths(paths: List[Path] | List[str], n: int = 30) -> List[str]: if not paths: return [] paths = sorted(paths, key=_natural_key) total = len(paths) n = max(1, min(n, total)) if n == total: return [str(p) for p in paths] step = (total - 1) / (n - 1) idxs = [round(i * step) for i in range(n)] out, seen = [], set() for i in idxs: if i not in seen: out.append(str(paths[int(i)])) seen.add(int(i)) return out # ──────────────────────────────────────────────────────── # Logo # ──────────────────────────────────────────────────────── APP_DIR = os.getcwd() def load_logo_base64(path: str) -> str: with open(path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") LOGO_B64 = load_logo_base64(os.path.join(APP_DIR, "bifrost_logo.png")) def render_logo_html(px: int = 96) -> str: return f"""
Bifröst · Video-to-Image Extractor
The Rainbow Bridge to Frames — split video into images with precision

""" # ──────────────────────────────────────────────────────── # System checks # ──────────────────────────────────────────────────────── def _which(name: str) -> Optional[str]: from shutil import which return which(name) FFMPEG = _which("ffmpeg") FFPROBE = _which("ffprobe") if not FFMPEG or not FFPROBE: MISSING_MSG = ( "⚠️ FFmpeg not found. Add a 'packages.txt' with exactly:\n" "ffmpeg\n" "libsm6\n" "libxext6\n" "Then restart the Space." ) else: MISSING_MSG = "" # ──────────────────────────────────────────────────────── # Helpers # ──────────────────────────────────────────────────────── def sanitize_prefix(txt: str) -> str: txt = (txt or "").strip() if not txt: return "" return re.sub(r"[^A-Za-z0-9._-]+", "_", txt)[:80] def ffprobe_json(input_path: str) -> dict: if not FFPROBE: return {} cmd = [FFPROBE, "-v", "error", "-print_format", "json", "-show_streams", "-show_format", input_path] res = subprocess.run(cmd, capture_output=True, text=True) if res.returncode != 0: return {} try: return json.loads(res.stdout) except Exception: return {} def parse_video_info(meta: dict) -> dict: info = {"duration": None, "fps": None, "width": None, "height": None} if not meta: return info try: info["duration"] = float(meta.get("format", {}).get("duration", None)) except Exception: pass vstreams = [s for s in meta.get("streams", []) if s.get("codec_type") == "video"] if vstreams: v = vstreams[0] rfr = v.get("r_frame_rate") or v.get("avg_frame_rate") if rfr and "/" in rfr: try: num, den = rfr.split("/") num = float(num); den = float(den) if den != 0: info["fps"] = num / den except Exception: pass info["width"] = v.get("width") info["height"] = v.get("height") return info def estimate_output_count(mode: str, duration: float | None, in_fps: float | None, every_seconds: float, nth_frame: int, exact_fps: float) -> Optional[int]: if not duration: return None in_fps = in_fps or 30.0 try: if mode == "All frames": return int(math.ceil(duration * in_fps)) if mode == "Every N seconds" and every_seconds > 0: return int(math.ceil(duration / every_seconds)) if mode == "Every Nth frame" and nth_frame > 0: return int(math.ceil((duration * in_fps) / nth_frame)) if mode == "Exact FPS" and exact_fps > 0: return int(math.ceil(duration * exact_fps)) except Exception: return None return None def build_ffmpeg_extract( input_path: str, mode: str, every_seconds: float, nth_frame: int, exact_fps: float, start_time: str, end_time: str, long_side: int, out_format: str, jpg_quality: int, png_level: int, scene_detect: bool, scene_thresh: float, out_pattern: str, ) -> List[str]: if not FFMPEG: raise RuntimeError("FFmpeg not available") cmd = [FFMPEG, "-y"] if start_time: cmd += ["-ss", start_time] cmd += ["-i", input_path] if end_time: cmd += ["-to", end_time] vf = [] if mode == "Every N seconds": vf.append(f"fps={max(1e-6, 1.0/float(every_seconds or 1))}") elif mode == "Every Nth frame": vf.append(f"select='not(mod(n,{max(1, int(nth_frame or 1))}))'") vf.append("setpts=N/FRAME_RATE/TB") elif mode == "Exact FPS": vf.append(f"fps={max(1e-6, float(exact_fps or 1))}") elif mode == "All frames": pass else: vf.append("fps=1") if scene_detect: vf.append(f"select='gt(scene,{float(scene_thresh)})',showinfo") vf.append("setpts=N/FRAME_RATE/TB") if long_side and long_side > 0: vf.append("scale='if(gt(iw,ih),%d,-1)':'if(gt(iw,ih),-1,%d)':force_original_aspect_ratio=decrease" % (long_side, long_side)) if vf: cmd += ["-vf", ",".join(vf)] if out_format == "jpg": cmd += ["-q:v", str(jpg_quality)] elif out_format == "png": cmd += ["-compression_level", str(png_level)] cmd += ["-frame_pts", "1", out_pattern] return cmd def render_progress(pct: float, label: str = "") -> str: pct = max(0.0, min(100.0, pct)) return f'''
{label} {pct:.1f}%
''' # ──────────────────────────────────────────────────────── # Extraction (Step 1) # ──────────────────────────────────────────────────────── def step1_extract( video: gr.File | None, mode: str, every_seconds: float, nth_frame: int, exact_fps: float, start_time: str, end_time: str, long_side: int, out_format: str, jpg_quality: int, png_level: int, scene_detect: bool, scene_thresh: float, prefix_in: str, prog_html: str, preview_all: bool, # ✅ toggle ): if not video or not video.name: yield None, None, "Upload a video.", "", prog_html, None, None, None return if not FFMPEG or not FFPROBE: yield None, None, "FFmpeg missing. See note below.", MISSING_MSG, prog_html, None, None, None return work = Path(tempfile.mkdtemp(prefix="vid2img_")) raw_dir = work / "frames_raw" raw_dir.mkdir(parents=True, exist_ok=True) prefix = sanitize_prefix(prefix_in) or Path(video.name).stem vinfo = parse_video_info(ffprobe_json(video.name)) full_duration = float(vinfo.get("duration") or 0.0) def _parse_ts(ts: str) -> float: if not ts: return 0.0 h, m, s = ts.split(":") if ":" in ts else ("0", "0", ts) return float(h) * 3600 + float(m) * 60 + float(s) st_s = _parse_ts((start_time or "").strip()) et_s = _parse_ts((end_time or "").strip()) if full_duration and st_s > 0: full_duration = max(0.0, full_duration - st_s) if full_duration and et_s > 0 and et_s < (vinfo.get("duration") or 0): full_duration = max(0.0, min(full_duration, et_s)) pattern = str(raw_dir / f"{prefix}_%05d.{out_format}") cmd = build_ffmpeg_extract( input_path=video.name, mode=mode, every_seconds=every_seconds, nth_frame=nth_frame, exact_fps=exact_fps, start_time=(start_time or "").strip(), end_time=(end_time or "").strip(), long_side=long_side, out_format=out_format, jpg_quality=jpg_quality, png_level=png_level, scene_detect=scene_detect, scene_thresh=scene_thresh, out_pattern=pattern, ) cmd = [cmd[0], "-progress", "pipe:2"] + cmd[1:] cmd_preview = " ".join([s if " " not in s else f'"{s}"' for s in cmd]) proc = subprocess.Popen( cmd, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, text=True, bufsize=1 ) last_pct = 0.0 gallery_preview = [] while True: line = proc.stderr.readline() if not line and proc.poll() is not None: break line = (line or "").strip() if line.startswith("out_time=") and full_duration > 0: t = line.split("=", 1)[1] try: h, m, s = t.split(":") secs = float(h) * 3600 + float(m) * 60 + float(s) except Exception: secs = 0.0 pct = max(0.0, min(100.0, (secs / full_duration) * 100.0)) if pct - last_pct >= 1.0 or pct in (0.0, 100.0): last_pct = pct gallery_preview = sample_paths(sorted(raw_dir.glob(f"{prefix}_*.{out_format}"), key=_natural_key), 36) yield gallery_preview, None, "Extracting…", cmd_preview, render_progress(pct, f"Extracting {pct:.0f}%"), None, str(raw_dir), prefix ret = proc.wait() frames = sorted(raw_dir.glob(f"{prefix}_*.{out_format}"), key=_natural_key) if preview_all: gallery = [str(p) for p in frames] else: gallery = [str(p) for p in frames] if len(frames) <= 100 else sample_paths(frames, 100) # ZIP name based on original video / custom prefix zip_path = work / f"{prefix}_frames.zip" with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for p in frames: zf.write(p, p.name) if ret != 0 or not frames: err = "" try: err = proc.stderr.read() if proc.stderr else "" except Exception: pass yield gallery, None, f"Extraction failed.\n\n{err}", cmd_preview, render_progress(0.0, "Failed"), None, str(raw_dir), prefix return details = f"Frames extracted: {len(frames)} | Saved to: {raw_dir}" yield gallery, str(zip_path), details, cmd_preview, render_progress(100.0, f"Extracted {len(frames)} frames"), [str(p) for p in frames], str(raw_dir), prefix # ──────────────────────────────────────────────────────── # UI # ──────────────────────────────────────────────────────── def build_ui(): with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.HTML(render_logo_html(88)) gr.Markdown("Extract frames from a video with live progress.") with gr.Row(): video = gr.File(label="Upload video", file_types=[".mp4", ".mov", ".mkv", ".avi", ".webm", ".m4v"], type="filepath") preview_all = gr.Checkbox(value=False, label="Preview all frames (may be slow)") with gr.Accordion("Extraction Settings", open=True): with gr.Row(): mode = gr.Dropdown(["Every N seconds", "Every Nth frame", "Exact FPS", "All frames"], value="Every N seconds", label="Mode") every_seconds = gr.Number(value=1.0, label="Every N seconds") nth_frame = gr.Number(value=30, label="Every Nth frame") exact_fps = gr.Number(value=1.0, label="Exact FPS") with gr.Row(): start_time = gr.Textbox(value="", label="Start (HH:MM:SS.mmm)") end_time = gr.Textbox(value="", label="End (HH:MM:SS.mmm)") long_side = gr.Number(value=0, label="Resize long side px (0 = none)") with gr.Row(): out_format = gr.Dropdown(["jpg", "png"], value="jpg", label="Output format") jpg_quality = gr.Slider(2, 31, value=3, step=1, label="JPG quality (2=best)") png_level = gr.Slider(0, 9, value=2, step=1, label="PNG compression level") with gr.Row(): scene_detect = gr.Checkbox(False, label="Scene-change detect") scene_thresh = gr.Slider(0.0, 1.0, value=0.3, step=0.01, label="Scene threshold") prefix_vid = gr.Textbox(value="", label="Filename prefix (defaults to input video name)") btn_extract = gr.Button("Extract Frames", variant="primary") prog = gr.HTML(render_progress(0.0, "Idle")) gallery = gr.Gallery(label="Preview (≤100 or all if toggled)", columns=6, height=480) zip_out = gr.File(label="Download frames ZIP") details = gr.Markdown("Ready.") with gr.Accordion("Show FFmpeg command", open=False): cmd_preview = gr.Textbox(label="ffmpeg command", lines=4) estimate_md = gr.Markdown("Estimated output: —") def _toggle_params(mode_val, fmt): return ( gr.update(visible=(mode_val == "Every N seconds")), gr.update(visible=(mode_val == "Every Nth frame")), gr.update(visible=(mode_val == "Exact FPS")), gr.update(visible=(fmt == "jpg")), gr.update(visible=(fmt == "png")), ) def update_estimate(vfile, mode_val, evs, nth, exfps, st, et): if not vfile or not getattr(vfile, 'name', None): return "Estimated output: —" info = parse_video_info(ffprobe_json(vfile.name)) dur = info.get("duration") def parse_ts(ts: str): if not ts: return 0.0 parts = ts.split(":") if len(parts) == 3: try: return float(parts[0])*3600 + float(parts[1])*60 + float(parts[2]) except Exception: return 0.0 return 0.0 st_s = parse_ts(st or ""); et_s = parse_ts(et or "") if dur: if st_s: dur = max(0.0, dur - st_s) if et_s and et_s < info.get("duration", 0) and et_s > 0: dur = min(dur, et_s) est = estimate_output_count(mode_val, dur, info.get("fps"), evs or 1.0, int(nth or 1), exfps or 1.0) return f"Estimated output: **~{est} frames**" if est else "Estimated output: —" mode.change(_toggle_params, [mode, out_format], [every_seconds, nth_frame, exact_fps, jpg_quality, png_level]) out_format.change(_toggle_params, [mode, out_format], [every_seconds, nth_frame, exact_fps, jpg_quality, png_level]) demo.load(_toggle_params, [mode, out_format], [every_seconds, nth_frame, exact_fps, jpg_quality, png_level]) for ctrl in [video, mode, every_seconds, nth_frame, exact_fps, start_time, end_time]: ctrl.change(update_estimate, inputs=[video, mode, every_seconds, nth_frame, exact_fps, start_time, end_time], outputs=[estimate_md]) btn_extract.click( step1_extract, inputs=[video, mode, every_seconds, nth_frame, exact_fps, start_time, end_time, long_side, out_format, jpg_quality, png_level, scene_detect, scene_thresh, prefix_vid, prog, preview_all], outputs=[gallery, zip_out, details, cmd_preview, prog], ) if MISSING_MSG: gr.Markdown(f"{MISSING_MSG}") return demo if __name__ == "__main__": build_ui().queue().launch()