File size: 3,677 Bytes
1e2eca7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import subprocess
import os
import re
import gradio as gr


def safe_none_on_error(_msg="error"):
    """Helper for unified exception handling (returns None)."""
    return None


def get_video_duration(input_path):
    try:
        result = subprocess.check_output([
            "ffprobe", "-v", "error", "-show_entries", "format=duration",
            "-of", "default=noprint_wrappers=1:nokey=1", input_path
        ]).decode().strip()
        return float(result)
    except (subprocess.CalledProcessError, FileNotFoundError):
        return safe_none_on_error("ffprobe error")


def build_ffmpeg_command(input_path, output_path, crf_value):
    return [
        "ffmpeg", "-y", "-i", input_path,
        "-c:v", "libx265", "-crf", str(int(crf_value)),
        "-c:a", "aac", output_path
    ]


def get_file_size_mb(path):
    return os.path.getsize(path) / (1024 * 1024)


def summarize_compression(input_path, output_path):
    """Return human-readable stats after compression."""
    orig_size = get_file_size_mb(input_path)
    new_size = get_file_size_mb(output_path)
    reduction = (1 - new_size / orig_size) * 100 if orig_size > 0 else 0

    msg = (f"Compression complete!\n"
           f"Original: {orig_size:.1f} MB\n"
           f"New: {new_size:.1f} MB\n"
           f"Reduced by: {reduction:.1f}%")
    return msg


def compress_video(input_file_path, crf_value, progress=gr.Progress()):
    if input_file_path is not None:
        input_path = input_file_path.name
        base, ext = os.path.splitext(input_path)
        output_path = f"{base}_compressed{ext}"

        duration = get_video_duration(input_path)
        if duration is None:
            return None, "Could not determine video duration. Ensure ffprobe is installed."

        cmd = build_ffmpeg_command(input_path, output_path, crf_value)

        process = subprocess.Popen(
            cmd, stderr=subprocess.PIPE, universal_newlines=True
        )

        time_pattern = re.compile(r"time=(\d+):(\d+):(\d+)\.?\d*")

        for line in iter(process.stderr.readline, ""):
            match = time_pattern.search(line)
            if match and duration > 0:
                h, m, s = map(int, match.groups())
                current_time = h * 3600 + m * 60 + s
                percent = min(int((current_time / duration) * 100), 100)
                progress(percent / 100, desc=f"Compressing... {percent}%")

        process.wait()

        if process.returncode != 0:
            return None, "Compression failed. Ensure ffmpeg is installed."

        try:
            msg = summarize_compression(input_path, output_path)
            return output_path, msg
        except (FileNotFoundError, OSError):
            return None, "Compression finished but error getting file size."

    return None, "No input file provided."


with gr.Blocks(title="Video Compressor") as demo:
    gr.Markdown("# 🎬 Video Compressor (ffmpeg + Gradio)")
    gr.Markdown("Upload a video, choose compression level (CRF), and download the smaller file.")

    with gr.Row():
        with gr.Column():
            input_file_ui = gr.File(label="Upload Video", file_types=[".mp4", ".avi", ".mov", ".mkv"])
            crf_slider = gr.Slider(23, 32, value=28, step=1, label="Compression Level (CRF)")
            compress_btn = gr.Button("Compress Video")
        with gr.Column():
            output_file = gr.File(label="Download Compressed Video")
            status_box = gr.Textbox(label="Status / Log")

    compress_btn.click(
        fn=compress_video,
        inputs=[input_file_ui, crf_slider],
        outputs=[output_file, status_box]
    )

if __name__ == "__main__":
    demo.launch(share=True)