File size: 16,932 Bytes
70b0566
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# app.py
# A lightweight, user-friendly FFmpeg UI for extracting video frames on Hugging Face Spaces.
# - Works on CPU (no GPU required)
# - Shows the exact ffmpeg command used
# - Lets you extract every N seconds, every Nth frame, or at an exact FPS
# - Optional start/end time trims, resize, JPG quality / PNG compression, scene-change detection
# - Returns a ZIP of frames and a gallery preview

import os
import re
import io
import sys
import json
import math
import time
import shutil
import zipfile
import tempfile
import subprocess
from pathlib import Path
from typing import List, Tuple, Optional

import gradio as gr

# ─────────────────────────────────────────────────────────────
# Utility: check for ffmpeg/ffprobe availability
# ─────────────────────────────────────────────────────────────

def _which(name: str) -> Optional[str]:
    from shutil import which
    return which(name)

FFMPEG = _which("ffmpeg")
FFPROBE = _which("ffprobe")

if not FFMPEG or not FFPROBE:
    # Friendly message shown in the UI footer if ffmpeg is missing
    MISSING_MSG = (
        "⚠️ FFmpeg not found. On Hugging Face Spaces, add a file named 'packages.txt' "
        "with a single line 'ffmpeg' (and optionally 'libsm6' 'libxext6'). Then restart the Space."
    )
else:
    MISSING_MSG = ""

# ─────────────────────────────────────────────────────────────
# Video probing via ffprobe
# ─────────────────────────────────────────────────────────────

def ffprobe_json(input_path: str) -> dict:
    if not FFPROBE:
        return {}
    cmd = [
        FFPROBE,
        "-v", "error",
        "-print_format", "json",
        "-show_streams",
        "-show_format",
        input_path,
    ]
    res = subprocess.run(cmd, capture_output=True, text=True)
    if res.returncode != 0:
        return {}
    try:
        return json.loads(res.stdout)
    except Exception:
        return {}


def parse_video_info(meta: dict) -> dict:
    info = {"duration": None, "fps": None, "width": None, "height": None, "codec": None}
    if not meta:
        return info

    # Duration from format
    try:
        info["duration"] = float(meta.get("format", {}).get("duration", None))
    except Exception:
        pass

    # Find the first video stream
    vstreams = [s for s in meta.get("streams", []) if s.get("codec_type") == "video"]
    if vstreams:
        v = vstreams[0]
        info["codec"] = v.get("codec_name")
        info["width"] = v.get("width")
        info["height"] = v.get("height")
        # FPS from r_frame_rate
        rfr = v.get("r_frame_rate") or v.get("avg_frame_rate")
        if rfr and "/" in rfr:
            num, den = rfr.split("/")
            try:
                num = float(num)
                den = float(den)
                if den != 0:
                    info["fps"] = num / den
            except Exception:
                pass
    return info


# ─────────────────────────────────────────────────────────────
# FFmpeg command builder
# ─────────────────────────────────────────────────────────────

def build_ffmpeg_command(
    input_path: str,
    mode: str,
    every_seconds: float,
    nth_frame: int,
    exact_fps: float,
    start_time: str,
    end_time: str,
    long_side: int,
    out_format: str,
    jpg_quality: int,
    png_level: int,
    scene_detect: bool,
    scene_thresh: float,
    out_pattern: str,
) -> List[str]:
    """Return a full ffmpeg command list for subprocess.run."""
    if not FFMPEG:
        raise RuntimeError("FFmpeg is not available on this system.")

    cmd = [FFMPEG, "-y"]

    # Optional in/out trims
    if start_time:
        cmd += ["-ss", start_time]

    cmd += ["-i", input_path]

    if end_time:
        # Use -to for end timestamp relative to input start
        cmd += ["-to", end_time]

    # Build filter chain
    vf_parts = []

    # 1) Frame selection / rate
    if mode == "Every N seconds":
        # fps=1/seconds
        rate = 1.0 / max(every_seconds, 0.000001)
        vf_parts.append(f"fps={rate}")
    elif mode == "Every Nth frame":
        # select='not(mod(n\,N))' -> then set fps to input fps to avoid duplicating
        vf_parts.append(f"select='not(mod(n,{max(nth_frame,1)}))'")
        vf_parts.append("setpts=N/FRAME_RATE/TB")
    elif mode == "Exact FPS":
        vf_parts.append(f"fps={max(exact_fps, 0.000001)}")
    elif mode == "All frames":
        # No explicit fps filter — pass all frames
        pass
    else:
        vf_parts.append("fps=1")

    # 2) Scene change detection (grabs frames when scene changes by threshold)
    if scene_detect:
        # Use select filter: 'gt(scene,THRESH)' outputs only scene-change frames
        vf_parts.append(f"select='gt(scene,{scene_thresh})',showinfo")
        vf_parts.append("setpts=N/FRAME_RATE/TB")

    # 3) Resize by long side
    if long_side and long_side > 0:
        # Maintain aspect: scale=LONG:-1 sets height auto; but we need to pick which side is longer
        # Use force_original_aspect_ratio=decrease and -1 for one dim with eval
        # To keep the *long* side at long_side, we can use scale logic with if(gt(iw,ih),...)
        vf_parts.append(
            f"scale='if(gt(iw,ih),{long_side},-1)':'if(gt(iw,ih),-1,{long_side})':force_original_aspect_ratio=decrease"
        )

    # Join vf
    if vf_parts:
        cmd += ["-vf", ",".join(vf_parts)]

    # 4) Output options per format
    ext = out_format.lower()
    if ext == "jpg":
        # Lower -q:v is higher quality. Map slider (2..31) directly
        cmd += ["-q:v", str(jpg_quality)]
    elif ext == "png":
        # Compression level 0..9
        cmd += ["-compression_level", str(png_level)]

    # Avoid timestamps gaps in patterns
    cmd += ["-frame_pts", "1"]

    # Output pattern
    cmd += [out_pattern]
    return cmd


# ─────────────────────────────────────────────────────────────
# Extraction runtime
# ─────────────────────────────────────────────────────────────

def extract_frames(
    video: gr.File | None,
    mode: str,
    every_seconds: float,
    nth_frame: int,
    exact_fps: float,
    start_time: str,
    end_time: str,
    long_side: int,
    out_format: str,
    jpg_quality: int,
    png_level: int,
    scene_detect: bool,
    scene_thresh: float,
    prefix: str,
    progress=gr.Progress(track_tqdm=True),
):
    if not video or not video.name:
        return None, None, "Please upload a video.", ""

    if not FFMPEG or not FFPROBE:
        return None, None, "FFmpeg is not available. See the note below.", MISSING_MSG

    # Probe video info
    meta = ffprobe_json(video.name)
    info = parse_video_info(meta)

    # Prepare temp dir
    work = Path(tempfile.mkdtemp(prefix="frames_"))
    out_dir = work / "frames"
    out_dir.mkdir(parents=True, exist_ok=True)

    # Output pattern
    ext = out_format.lower()
    pattern = str(out_dir / f"{prefix}_%05d.{ext}")

    # Build command
    cmd = build_ffmpeg_command(
        input_path=video.name,
        mode=mode,
        every_seconds=every_seconds,
        nth_frame=nth_frame,
        exact_fps=exact_fps,
        start_time=start_time.strip(),
        end_time=end_time.strip(),
        long_side=long_side,
        out_format=ext,
        jpg_quality=jpg_quality,
        png_level=png_level,
        scene_detect=scene_detect,
        scene_thresh=scene_thresh,
        out_pattern=pattern,
    )

    # Friendly command preview
    command_preview = " ".join([sh if " " not in sh else f'"{sh}"' for sh in cmd])

    # Estimate total frames for progress (best-effort)
    total = None
    try:
        duration = info.get("duration")
        in_fps = info.get("fps") or 30
        if start_time:
            # Roughly convert HH:MM:SS.mmm to seconds
            parts = [float(x) for x in re.split(r"[:]", start_time)]
            if len(parts) == 3:
                duration = max(0.0, (duration or 0) - (parts[0]*3600 + parts[1]*60 + parts[2]))
        if end_time:
            parts = [float(x) for x in re.split(r"[:]", end_time)]
            if len(parts) == 3:
                duration = min(duration or 0, (parts[0]*3600 + parts[1]*60 + parts[2]))
        if duration:
            if mode == "Every N seconds" and every_seconds > 0:
                total = int(math.ceil(duration / every_seconds))
            elif mode == "Every Nth frame" and in_fps and nth_frame > 0:
                total = int(math.ceil((duration * in_fps) / nth_frame))
            elif mode == "Exact FPS" and exact_fps > 0:
                total = int(math.ceil(duration * exact_fps))
            elif mode == "All frames" and in_fps:
                total = int(math.ceil(duration * in_fps))
    except Exception:
        total = None

    # Run ffmpeg and stream stderr for incremental progress by counting files created
    proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, text=True, bufsize=1)

    created = 0
    last_update = time.time()
    while True:
        line = proc.stderr.readline()
        if not line and proc.poll() is not None:
            break
        # Periodically refresh progress based on files present
        if time.time() - last_update > 0.2:
            created = len(list(out_dir.glob(f"{prefix}_*.{ext}")))
            if total:
                progress(created / max(total, 1))
            last_update = time.time()

    ret = proc.wait()

    # Final count
    frame_files = sorted(out_dir.glob(f"{prefix}_*.{ext}"))
    created = len(frame_files)

    if ret != 0 or created == 0:
        # Read remaining stderr to show message
        try:
            err_rest = proc.stderr.read() if proc.stderr else ""
        except Exception:
            err_rest = ""
        return None, None, f"FFmpeg failed or produced no frames.\n\nStderr:\n{err_rest}", command_preview

    # Build a small gallery (cap to avoid huge RAM)
    gallery_cap = 60
    gallery_paths = [str(p) for p in frame_files[:gallery_cap]]

    # Zip everything
    zip_path = work / f"{prefix}_frames.zip"
    with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
        for p in frame_files:
            zf.write(p, p.name)

    # Info text
    info_lines = []
    if info.get("fps"):
        info_lines.append(f"Input FPS: {info['fps']:.3f}")
    if info.get("duration"):
        info_lines.append(f"Duration: {info['duration']:.2f}s")
    if info.get("width") and info.get("height"):
        info_lines.append(f"Resolution: {info['width']}×{info['height']}")
    info_lines.append(f"Frames extracted: {created}")

    details = "\n".join(info_lines)

    return gallery_paths, str(zip_path), details, command_preview


# ─────────────────────────────────────────────────────────────
# UI
# ─────────────────────────────────────────────────────────────

def build_ui():
    with gr.Blocks(theme=gr.themes.Soft(), css="""
    .cf-title { font-size: 1.6rem; font-weight: 800; }
    .cf-sub { opacity: .8; }
    .cmdbox textarea { font-family: ui-monospace, SFMono-Regular, Menlo, monospace; font-size: 12px; }
    """) as demo:
        gr.Markdown("""
        <div class="cf-title">FFmpeg Frames UI</div>
        <div class="cf-sub">Extract JPG/PNG frames from videos — friendly controls, exact command shown.</div>
        """)

        with gr.Row():
            video = gr.File(label="Upload video", file_types=[".mp4", ".mov", ".mkv", ".avi", ".webm", ".m4v"], type="filepath")

        with gr.Row():
            mode = gr.Dropdown(
                ["Every N seconds", "Every Nth frame", "Exact FPS", "All frames"],
                value="Every N seconds", label="Extraction mode"
            )
            every_seconds = gr.Number(value=1.0, label="Every N seconds (e.g., 0.5 = every 500ms)")
            nth_frame = gr.Number(value=30, label="Every Nth frame (e.g., 30 = 1 frame per 30)")
            exact_fps = gr.Number(value=1.0, label="Exact FPS (e.g., 2.0)")

        with gr.Row():
            start_time = gr.Textbox(value="", label="Start time (HH:MM:SS.mmm, optional)")
            end_time = gr.Textbox(value="", label="End time (HH:MM:SS.mmm, optional)")
            long_side = gr.Number(value=0, label="Resize long side px (0 = no resize)")

        with gr.Row():
            out_format = gr.Dropdown(["jpg", "png"], value="jpg", label="Output format")
            jpg_quality = gr.Slider(2, 31, value=3, step=1, label="JPG quality (2=best, 31=worst)")
            png_level = gr.Slider(0, 9, value=2, step=1, label="PNG compression level (0..9)")

        with gr.Row():
            scene_detect = gr.Checkbox(value=False, label="Enable scene-change extraction (advanced)")
            scene_thresh = gr.Slider(0.0, 1.0, value=0.3, step=0.01, label="Scene threshold (0..1)")
            prefix = gr.Textbox(value="frame", label="Filename prefix")

        with gr.Row():
            run_btn = gr.Button("Extract Frames", variant="primary")
            info_btn = gr.Button("Probe Video (FPS / Duration)")

        with gr.Row():
            gallery = gr.Gallery(label="Preview (first 60 frames)", columns=6, height=300)

        with gr.Row():
            zip_out = gr.File(label="Download all frames as ZIP")

        with gr.Row():
            details = gr.Markdown("Frames extracted: —")

        with gr.Accordion("Show exact ffmpeg command", open=False):
            cmd_preview = gr.Textbox(label="ffmpeg command", lines=4, elem_classes=["cmdbox"])

        # Footer
        if MISSING_MSG:
            gr.Markdown(f"<span style='color:#b45309'>{MISSING_MSG}</span>")

        # Wire behavior: enable/disable param groups depending on mode / format
        def _toggle_params(mode_val, fmt):
            return (
                gr.update(visible=(mode_val == "Every N seconds")),
                gr.update(visible=(mode_val == "Every Nth frame")),
                gr.update(visible=(mode_val == "Exact FPS")),
                gr.update(visible=(fmt == "jpg")),
                gr.update(visible=(fmt == "png")),
            )

        mode.change(
            _toggle_params,
            inputs=[mode, out_format],
            outputs=[every_seconds, nth_frame, exact_fps, jpg_quality, png_level],
        )
        out_format.change(
            _toggle_params,
            inputs=[mode, out_format],
            outputs=[every_seconds, nth_frame, exact_fps, jpg_quality, png_level],
        )
        # Initialize visibility
        demo.load(_toggle_params, inputs=[mode, out_format], outputs=[every_seconds, nth_frame, exact_fps, jpg_quality, png_level])

        # Actions
        run_btn.click(
            extract_frames,
            inputs=[
                video, mode, every_seconds, nth_frame, exact_fps,
                start_time, end_time, long_side, out_format, jpg_quality, png_level,
                scene_detect, scene_thresh, prefix
            ],
            outputs=[gallery, zip_out, details, cmd_preview],
            api_name="extract_frames",
        )

        def probe(video: gr.File | None):
            if not video or not video.name:
                return "Upload a video to probe."
            meta = ffprobe_json(video.name)
            info = parse_video_info(meta)
            lines = ["**Video info**"]
            if info.get("fps"):
                lines.append(f"• FPS: **{info['fps']:.3f}**")
            if info.get("duration"):
                lines.append(f"• Duration: **{info['duration']:.2f}s**")
            if info.get("width") and info.get("height"):
                lines.append(f"• Resolution: **{info['width']}×{info['height']}**")
            return "\n".join(lines)

        info_btn.click(probe, inputs=[video], outputs=[details])

    return demo


if __name__ == "__main__":
    demo = build_ui()
    demo.queue().launch()