Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import json | |
| import subprocess | |
| import os | |
| import tempfile | |
| import shutil | |
| from urllib.parse import urlparse | |
| import time | |
| def download_file(url, dest_path): | |
| """Download a file from URL to destination path.""" | |
| try: | |
| response = requests.get(url, stream=True, timeout=30) | |
| response.raise_for_status() | |
| with open(dest_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return True | |
| except Exception as e: | |
| print(f"Error downloading {url}: {str(e)}") | |
| return False | |
| def extract_metadata(video_path): | |
| """Extract metadata from video file using ffprobe.""" | |
| try: | |
| cmd = [ | |
| 'ffprobe', '-v', 'quiet', '-print_format', 'json', | |
| '-show_format', '-show_streams', video_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| data = json.loads(result.stdout) | |
| format_info = data.get('format', {}) | |
| metadata = { | |
| 'duration': float(format_info.get('duration', 0)), | |
| 'filesize': int(format_info.get('size', 0)), | |
| 'bitrate': int(format_info.get('bit_rate', 0)), | |
| 'encoder': format_info.get('tags', {}).get('encoder', 'Unknown') | |
| } | |
| return metadata | |
| except Exception as e: | |
| print(f"Error extracting metadata: {str(e)}") | |
| return {} | |
| def generate_thumbnail(video_path, thumbnail_path): | |
| """Generate thumbnail from video.""" | |
| try: | |
| cmd = [ | |
| 'ffmpeg', '-i', video_path, '-ss', '00:00:01', | |
| '-vframes', '1', '-q:v', '2', thumbnail_path, '-y' | |
| ] | |
| subprocess.run(cmd, capture_output=True) | |
| return os.path.exists(thumbnail_path) | |
| except: | |
| return False | |
| def process_ffmpeg_job(json_input, api_url=None): | |
| """Process FFmpeg job from JSON input or API URL.""" | |
| try: | |
| # Parse JSON input | |
| if api_url and api_url.strip(): | |
| response = requests.get(api_url.strip(), timeout=30) | |
| response.raise_for_status() | |
| job_data = response.json() | |
| else: | |
| job_data = json.loads(json_input) | |
| job_id = job_data.get('id', 'output') | |
| # Create temporary directory for processing | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| # Download all input files | |
| input_files = [] | |
| for i, input_item in enumerate(job_data.get('inputs', [])): | |
| file_url = input_item.get('file_url') | |
| if not file_url: | |
| continue | |
| # Determine file extension | |
| parsed_url = urlparse(file_url) | |
| filename = os.path.basename(parsed_url.path) | |
| if not filename: | |
| filename = f"input_{i}" | |
| dest_path = os.path.join(temp_dir, f"{i}_{filename}") | |
| print(f"Downloading {file_url}...") | |
| if download_file(file_url, dest_path): | |
| input_files.append(dest_path) | |
| else: | |
| return None, f"Failed to download: {file_url}", None | |
| if not input_files: | |
| return None, "No input files to process", None | |
| # Build FFmpeg command | |
| cmd = ['ffmpeg'] | |
| # Add input files | |
| for input_file in input_files: | |
| cmd.extend(['-i', input_file]) | |
| # Add filters | |
| filter_complex = [] | |
| for filter_item in job_data.get('filters', []): | |
| filter_str = filter_item.get('filter', '') | |
| if filter_str: | |
| filter_complex.append(filter_str) | |
| if filter_complex: | |
| cmd.extend(['-filter_complex', ';'.join(filter_complex)]) | |
| # Add output options | |
| for output in job_data.get('outputs', []): | |
| for option in output.get('options', []): | |
| opt = option.get('option', '') | |
| arg = option.get('argument', '') | |
| if opt: | |
| cmd.append(opt) | |
| if arg: | |
| cmd.append(arg) | |
| # Output file | |
| output_filename = f"{job_id}_output.mp4" | |
| output_path = os.path.join(temp_dir, output_filename) | |
| cmd.append(output_path) | |
| # Execute FFmpeg command | |
| print(f"Executing FFmpeg command...") | |
| print(' '.join(cmd)) | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| return None, f"FFmpeg error: {result.stderr}", None | |
| # Check if output file exists | |
| if not os.path.exists(output_path): | |
| return None, "Output file was not created", None | |
| # Extract metadata if requested | |
| metadata = {} | |
| if job_data.get('metadata', {}).get('duration') or \ | |
| job_data.get('metadata', {}).get('filesize') or \ | |
| job_data.get('metadata', {}).get('bitrate') or \ | |
| job_data.get('metadata', {}).get('encoder'): | |
| metadata = extract_metadata(output_path) | |
| # Generate thumbnail if requested | |
| thumbnail_path = None | |
| if job_data.get('metadata', {}).get('thumbnail'): | |
| thumb_filename = f"{job_id}_thumbnail.jpg" | |
| thumb_path = os.path.join(temp_dir, thumb_filename) | |
| if generate_thumbnail(output_path, thumb_path): | |
| # Copy thumbnail to permanent location | |
| perm_thumb_path = os.path.join(".", thumb_filename) | |
| shutil.copy2(thumb_path, perm_thumb_path) | |
| thumbnail_path = perm_thumb_path | |
| # Copy output to permanent location | |
| permanent_output = os.path.join(".", output_filename) | |
| shutil.copy2(output_path, permanent_output) | |
| # Format metadata for display | |
| metadata_str = "" | |
| if metadata: | |
| metadata_str = f""" | |
| **Metadata:** | |
| - Duration: {metadata.get('duration', 0):.2f} seconds | |
| - File size: {metadata.get('filesize', 0) / (1024*1024):.2f} MB | |
| - Bitrate: {metadata.get('bitrate', 0) / 1000:.0f} kbps | |
| - Encoder: {metadata.get('encoder', 'Unknown')} | |
| """ | |
| return permanent_output, f"Processing complete!\n{metadata_str}", thumbnail_path | |
| except json.JSONDecodeError: | |
| return None, "Invalid JSON format", None | |
| except requests.RequestException as e: | |
| return None, f"API request error: {str(e)}", None | |
| except Exception as e: | |
| return None, f"Processing error: {str(e)}", None | |
| # Create Gradio interface | |
| def create_interface(): | |
| with gr.Blocks(title="FFmpeg Video Processor") as app: | |
| gr.Markdown("# FFmpeg Video Processor") | |
| gr.Markdown("Process videos using FFmpeg with JSON configuration from API or direct input.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| api_url_input = gr.Textbox( | |
| label="API URL (optional)", | |
| placeholder="https://api.example.com/ffmpeg-job", | |
| lines=1 | |
| ) | |
| json_input = gr.Textbox( | |
| label="JSON Input (used if no API URL provided)", | |
| placeholder='{"inputs": [...], "filters": [...], "outputs": [...]}', | |
| lines=15, | |
| value=json.dumps({ | |
| "inputs": [ | |
| {"file_url": "https://example.com/video1.mp4"}, | |
| {"file_url": "https://example.com/video2.mp4"} | |
| ], | |
| "filters": [ | |
| {"filter": "[0:v][1:v]concat=n=2:v=1:a=0[outv]"} | |
| ], | |
| "outputs": [ | |
| { | |
| "options": [ | |
| {"option": "-map", "argument": "[outv]"}, | |
| {"option": "-c:v", "argument": "libx264"} | |
| ] | |
| } | |
| ], | |
| "metadata": { | |
| "thumbnail": True, | |
| "filesize": True, | |
| "duration": True | |
| }, | |
| "id": "example_job" | |
| }, indent=2) | |
| ) | |
| process_btn = gr.Button("Process Video", variant="primary") | |
| with gr.Column(): | |
| output_video = gr.Video(label="Processed Video") | |
| output_thumbnail = gr.Image(label="Thumbnail", visible=False) | |
| status_text = gr.Textbox(label="Status", lines=8) | |
| download_file = gr.File(label="Download Processed Video", visible=False) | |
| def process_and_update(api_url, json_str): | |
| output_path, status, thumbnail = process_ffmpeg_job(json_str, api_url) | |
| if output_path and os.path.exists(output_path): | |
| return ( | |
| output_path, # video | |
| status, # status text | |
| output_path, # download file | |
| gr.update(visible=True), # show download | |
| thumbnail, # thumbnail | |
| gr.update(visible=bool(thumbnail)) # show thumbnail if exists | |
| ) | |
| else: | |
| return ( | |
| None, # video | |
| status, # status text | |
| None, # download file | |
| gr.update(visible=False), # hide download | |
| None, # thumbnail | |
| gr.update(visible=False) # hide thumbnail | |
| ) | |
| process_btn.click( | |
| fn=process_and_update, | |
| inputs=[api_url_input, json_input], | |
| outputs=[output_video, status_text, download_file, download_file, output_thumbnail, output_thumbnail] | |
| ) | |
| gr.Markdown(""" | |
| ## Instructions: | |
| 1. Either provide an API URL that returns the JSON configuration, or paste the JSON directly | |
| 2. The JSON should contain: | |
| - `inputs`: Array of input files with `file_url` | |
| - `filters`: Array of FFmpeg filter strings | |
| - `outputs`: Array of output options | |
| - `metadata`: Optional metadata extraction settings | |
| - `id`: Job identifier | |
| 3. Click "Process Video" to start processing | |
| 4. The processed video will be displayed and available for download | |
| """) | |
| return app | |
| # Create and launch the app | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.launch() |