File size: 5,087 Bytes
e30142e
 
 
 
 
83e5fff
e30142e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83e5fff
e30142e
 
 
 
 
 
 
 
 
 
 
 
 
 
83e5fff
e30142e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83e5fff
 
e30142e
 
 
 
 
 
 
 
83e5fff
 
 
e30142e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import subprocess
import pathlib
import hashlib
import requests
import gradio as gr

# ----------------------------------------------------------------------
# 1️⃣  Download a static FFmpeg build (Linux x86_64, CPU only)
# ----------------------------------------------------------------------
FFMPEG_URL = (
    "https://github.com/BtbN/FFmpeg-Builds/releases/download"
    "/latest/ffmpeg-master-latest-linux64-gpl.tar.xz"
)
FFMPEG_DIR = pathlib.Path("ffmpeg")
BINARY_PATH = FFMPEG_DIR / "ffmpeg"

def _download_and_extract():
    """Download and extract FFmpeg if not already present."""
    if BINARY_PATH.is_file():
        return

    # Create directory
    FFMPEG_DIR.mkdir(parents=True, exist_ok=True)
    tar_path = FFMPEG_DIR / "ffmpeg.tar.xz"

    # Stream download (avoids loading whole file into memory)
    with requests.get(FFMPEG_URL, stream=True, timeout=30) as r:
        r.raise_for_status()
        with open(tar_path, "wb") as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)

    # Extract only the binary (no need for docs, etc.)
    import tarfile
    with tarfile.open(tar_path, "r:xz") as tar:
        # Find the member that ends with '/ffmpeg'
        for member in tar.getmembers():
            if member.name.endswith("/ffmpeg"):
                member.path = os.path.basename(member.name)  # flatten
                tar.extract(member, path=FFMPEG_DIR)
                break

    # Clean up archive
    tar_path.unlink()

    # Make binary executable
    BINARY_PATH.chmod(0o755)

_download_and_extract()

# ----------------------------------------------------------------------
# 2️⃣  Helper: run FFmpeg safely
# ----------------------------------------------------------------------
def run_ffmpeg(input_bytes: bytes, args: str) -> bytes:
    """
    Execute a very limited FFmpeg command.

    Parameters
    ----------
    input_bytes : bytes
        Raw input file (e.g., video or audio) uploaded by the user.
    args : str
        Command‑line arguments *after* the input and before the output.
        Example: "-c:a aac -b:a 128k" (convert audio to AAC).

    Returns
    -------
    bytes
        Output file produced by FFmpeg.
    """
    # Validate arguments – allow only a whitelist of safe flags
    allowed_flags = {
        "-c:a", "-c:v", "-b:a", "-b:v", "-ar", "-ac", "-vn", "-an",
        "-map", "-f", "-y", "-loglevel", "quiet"
    }
    tokenised = args.split()
    if any(tok not in allowed_flags and not tok.startswith("-") for tok in tokenised):
        raise ValueError("Unsupported FFmpeg flag detected.")

    # Write input to a temporary file
    input_path = pathlib.Path("input.tmp")
    output_path = pathlib.Path("output.tmp")
    input_path.write_bytes(input_bytes)

    # Build the command
    cmd = [
        str(BINARY_PATH),
        "-loglevel", "quiet",               # suppress noisy output
        "-i", str(input_path),              # input file
        *tokenised,                         # user‑provided args
        str(output_path)                    # output file
    ]

    # Run FFmpeg; enforce a short timeout (e.g., 30 s) to keep the Space alive
    try:
        subprocess.run(
            cmd,
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            timeout=30,
        )
    finally:
        # Clean up input regardless of success/failure
        if input_path.exists():
            input_path.unlink()

    if not output_path.is_file():
        raise RuntimeError("FFmpeg did not produce an output file.")

    # Return output bytes and clean up
    out_bytes = output_path.read_bytes()
    output_path.unlink()
    return out_bytes

# ----------------------------------------------------------------------
# 3️⃣  Gradio UI
# ----------------------------------------------------------------------
def ffmpeg_interface(file: gr.File, args: str) -> gr.File:
    """
    Gradio wrapper that takes an uploaded file and FFmpeg args,
    returns the processed file.
    """
    output_bytes = run_ffmpeg(file.read(), args)
    # Heuristic MIME type based on args; fallback to generic binary
    mime = "application/octet-stream"
    if "-c:a aac" in args or ".aac" in args:
        mime = "audio/aac"
    elif "-c:v libx264" in args or ".mp4" in args:
        mime = "video/mp4"
    return gr.File.update(value=output_bytes, mime_type=mime)

iface = gr.Interface(
    fn=ffmpeg_interface,
    inputs=[
        gr.File(label="Upload video/audio file"),
        gr.Textbox(
            label="FFmpeg arguments (after input, before output)",
            placeholder="-c:a aac -b:a 128k",
        ),
    ],
    outputs=gr.File(label="Processed file"),
    title=" FFmpeg on Hugging Face Spaces (CPU‑only)",
    description=(
        "Upload a media file and supply a limited set of FFmpeg flags. "
        "The app runs on a free‑tier CPU space, so keep jobs short."
    ),
    allow_flagging="never",
    analytics_enabled=False,
)

if __name__ == "__main__":
    iface.launch()