File size: 11,247 Bytes
fad923e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
import gradio as gr
import requests
import json
import subprocess
import os
import tempfile
import shutil
from urllib.parse import urlparse
import time

def download_file(url, dest_path):
    """Download a file from URL to destination path."""
    try:
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()
        with open(dest_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        return True
    except Exception as e:
        print(f"Error downloading {url}: {str(e)}")
        return False

def extract_metadata(video_path):
    """Extract metadata from video file using ffprobe."""
    try:
        cmd = [
            'ffprobe', '-v', 'quiet', '-print_format', 'json',
            '-show_format', '-show_streams', video_path
        ]
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode == 0:
            data = json.loads(result.stdout)
            format_info = data.get('format', {})
            
            metadata = {
                'duration': float(format_info.get('duration', 0)),
                'filesize': int(format_info.get('size', 0)),
                'bitrate': int(format_info.get('bit_rate', 0)),
                'encoder': format_info.get('tags', {}).get('encoder', 'Unknown')
            }
            return metadata
    except Exception as e:
        print(f"Error extracting metadata: {str(e)}")
    return {}

def generate_thumbnail(video_path, thumbnail_path):
    """Generate thumbnail from video."""
    try:
        cmd = [
            'ffmpeg', '-i', video_path, '-ss', '00:00:01',
            '-vframes', '1', '-q:v', '2', thumbnail_path, '-y'
        ]
        subprocess.run(cmd, capture_output=True)
        return os.path.exists(thumbnail_path)
    except:
        return False

def process_ffmpeg_job(json_input, api_url=None):
    """Process FFmpeg job from JSON input or API URL."""
    try:
        # Parse JSON input
        if api_url and api_url.strip():
            response = requests.get(api_url.strip(), timeout=30)
            response.raise_for_status()
            job_data = response.json()
        else:
            job_data = json.loads(json_input)
        
        job_id = job_data.get('id', 'output')
        
        # Create temporary directory for processing
        with tempfile.TemporaryDirectory() as temp_dir:
            # Download all input files
            input_files = []
            for i, input_item in enumerate(job_data.get('inputs', [])):
                file_url = input_item.get('file_url')
                if not file_url:
                    continue
                
                # Determine file extension
                parsed_url = urlparse(file_url)
                filename = os.path.basename(parsed_url.path)
                if not filename:
                    filename = f"input_{i}"
                
                dest_path = os.path.join(temp_dir, f"{i}_{filename}")
                
                print(f"Downloading {file_url}...")
                if download_file(file_url, dest_path):
                    input_files.append(dest_path)
                else:
                    return None, f"Failed to download: {file_url}", None
            
            if not input_files:
                return None, "No input files to process", None
            
            # Build FFmpeg command
            cmd = ['ffmpeg']
            
            # Add input files
            for input_file in input_files:
                cmd.extend(['-i', input_file])
            
            # Add filters
            filter_complex = []
            for filter_item in job_data.get('filters', []):
                filter_str = filter_item.get('filter', '')
                if filter_str:
                    filter_complex.append(filter_str)
            
            if filter_complex:
                cmd.extend(['-filter_complex', ';'.join(filter_complex)])
            
            # Add output options
            for output in job_data.get('outputs', []):
                for option in output.get('options', []):
                    opt = option.get('option', '')
                    arg = option.get('argument', '')
                    if opt:
                        cmd.append(opt)
                        if arg:
                            cmd.append(arg)
            
            # Output file
            output_filename = f"{job_id}_output.mp4"
            output_path = os.path.join(temp_dir, output_filename)
            cmd.append(output_path)
            
            # Execute FFmpeg command
            print(f"Executing FFmpeg command...")
            print(' '.join(cmd))
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode != 0:
                return None, f"FFmpeg error: {result.stderr}", None
            
            # Check if output file exists
            if not os.path.exists(output_path):
                return None, "Output file was not created", None
            
            # Extract metadata if requested
            metadata = {}
            if job_data.get('metadata', {}).get('duration') or \
               job_data.get('metadata', {}).get('filesize') or \
               job_data.get('metadata', {}).get('bitrate') or \
               job_data.get('metadata', {}).get('encoder'):
                metadata = extract_metadata(output_path)
            
            # Generate thumbnail if requested
            thumbnail_path = None
            if job_data.get('metadata', {}).get('thumbnail'):
                thumb_filename = f"{job_id}_thumbnail.jpg"
                thumb_path = os.path.join(temp_dir, thumb_filename)
                if generate_thumbnail(output_path, thumb_path):
                    # Copy thumbnail to permanent location
                    perm_thumb_path = os.path.join(".", thumb_filename)
                    shutil.copy2(thumb_path, perm_thumb_path)
                    thumbnail_path = perm_thumb_path
            
            # Copy output to permanent location
            permanent_output = os.path.join(".", output_filename)
            shutil.copy2(output_path, permanent_output)
            
            # Format metadata for display
            metadata_str = ""
            if metadata:
                metadata_str = f"""
**Metadata:**
- Duration: {metadata.get('duration', 0):.2f} seconds
- File size: {metadata.get('filesize', 0) / (1024*1024):.2f} MB
- Bitrate: {metadata.get('bitrate', 0) / 1000:.0f} kbps
- Encoder: {metadata.get('encoder', 'Unknown')}
"""
            
            return permanent_output, f"Processing complete!\n{metadata_str}", thumbnail_path
            
    except json.JSONDecodeError:
        return None, "Invalid JSON format", None
    except requests.RequestException as e:
        return None, f"API request error: {str(e)}", None
    except Exception as e:
        return None, f"Processing error: {str(e)}", None

# Create Gradio interface
def create_interface():
    with gr.Blocks(title="FFmpeg Video Processor") as app:
        gr.Markdown("# FFmpeg Video Processor")
        gr.Markdown("Process videos using FFmpeg with JSON configuration from API or direct input.")
        
        with gr.Row():
            with gr.Column():
                api_url_input = gr.Textbox(
                    label="API URL (optional)",
                    placeholder="https://api.example.com/ffmpeg-job",
                    lines=1
                )
                
                json_input = gr.Textbox(
                    label="JSON Input (used if no API URL provided)",
                    placeholder='{"inputs": [...], "filters": [...], "outputs": [...]}',
                    lines=15,
                    value=json.dumps({
                        "inputs": [
                            {"file_url": "https://example.com/video1.mp4"},
                            {"file_url": "https://example.com/video2.mp4"}
                        ],
                        "filters": [
                            {"filter": "[0:v][1:v]concat=n=2:v=1:a=0[outv]"}
                        ],
                        "outputs": [
                            {
                                "options": [
                                    {"option": "-map", "argument": "[outv]"},
                                    {"option": "-c:v", "argument": "libx264"}
                                ]
                            }
                        ],
                        "metadata": {
                            "thumbnail": True,
                            "filesize": True,
                            "duration": True
                        },
                        "id": "example_job"
                    }, indent=2)
                )
                
                process_btn = gr.Button("Process Video", variant="primary")
                
            with gr.Column():
                output_video = gr.Video(label="Processed Video")
                output_thumbnail = gr.Image(label="Thumbnail", visible=False)
                status_text = gr.Textbox(label="Status", lines=8)
                download_file = gr.File(label="Download Processed Video", visible=False)
        
        def process_and_update(api_url, json_str):
            output_path, status, thumbnail = process_ffmpeg_job(json_str, api_url)
            
            if output_path and os.path.exists(output_path):
                return (
                    output_path,  # video
                    status,       # status text
                    output_path,  # download file
                    gr.update(visible=True),  # show download
                    thumbnail,    # thumbnail
                    gr.update(visible=bool(thumbnail))  # show thumbnail if exists
                )
            else:
                return (
                    None,         # video
                    status,       # status text
                    None,         # download file
                    gr.update(visible=False),  # hide download
                    None,         # thumbnail
                    gr.update(visible=False)   # hide thumbnail
                )
        
        process_btn.click(
            fn=process_and_update,
            inputs=[api_url_input, json_input],
            outputs=[output_video, status_text, download_file, download_file, output_thumbnail, output_thumbnail]
        )
        
        gr.Markdown("""
        ## Instructions:
        1. Either provide an API URL that returns the JSON configuration, or paste the JSON directly
        2. The JSON should contain:
           - `inputs`: Array of input files with `file_url`
           - `filters`: Array of FFmpeg filter strings
           - `outputs`: Array of output options
           - `metadata`: Optional metadata extraction settings
           - `id`: Job identifier
        3. Click "Process Video" to start processing
        4. The processed video will be displayed and available for download
        """)
    
    return app

# Create and launch the app
if __name__ == "__main__":
    app = create_interface()
    app.launch()