import gradio as gr import requests import json import subprocess import os import tempfile import shutil from urllib.parse import urlparse import time def download_file(url, dest_path): """Download a file from URL to destination path.""" try: response = requests.get(url, stream=True, timeout=30) response.raise_for_status() with open(dest_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return True except Exception as e: print(f"Error downloading {url}: {str(e)}") return False def extract_metadata(video_path): """Extract metadata from video file using ffprobe.""" try: cmd = [ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', video_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: data = json.loads(result.stdout) format_info = data.get('format', {}) metadata = { 'duration': float(format_info.get('duration', 0)), 'filesize': int(format_info.get('size', 0)), 'bitrate': int(format_info.get('bit_rate', 0)), 'encoder': format_info.get('tags', {}).get('encoder', 'Unknown') } return metadata except Exception as e: print(f"Error extracting metadata: {str(e)}") return {} def generate_thumbnail(video_path, thumbnail_path): """Generate thumbnail from video.""" try: cmd = [ 'ffmpeg', '-i', video_path, '-ss', '00:00:01', '-vframes', '1', '-q:v', '2', thumbnail_path, '-y' ] subprocess.run(cmd, capture_output=True) return os.path.exists(thumbnail_path) except: return False def process_ffmpeg_job(json_input, api_url=None): """Process FFmpeg job from JSON input or API URL.""" try: # Parse JSON input if api_url and api_url.strip(): response = requests.get(api_url.strip(), timeout=30) response.raise_for_status() job_data = response.json() else: job_data = json.loads(json_input) job_id = job_data.get('id', 'output') # Create temporary directory for processing with tempfile.TemporaryDirectory() as temp_dir: # Download all input files input_files = [] for i, input_item in enumerate(job_data.get('inputs', [])): file_url = input_item.get('file_url') if not file_url: continue # Determine file extension parsed_url = urlparse(file_url) filename = os.path.basename(parsed_url.path) if not filename: filename = f"input_{i}" dest_path = os.path.join(temp_dir, f"{i}_{filename}") print(f"Downloading {file_url}...") if download_file(file_url, dest_path): input_files.append(dest_path) else: return None, f"Failed to download: {file_url}", None if not input_files: return None, "No input files to process", None # Build FFmpeg command cmd = ['ffmpeg'] # Add input files for input_file in input_files: cmd.extend(['-i', input_file]) # Add filters filter_complex = [] for filter_item in job_data.get('filters', []): filter_str = filter_item.get('filter', '') if filter_str: filter_complex.append(filter_str) if filter_complex: cmd.extend(['-filter_complex', ';'.join(filter_complex)]) # Add output options for output in job_data.get('outputs', []): for option in output.get('options', []): opt = option.get('option', '') arg = option.get('argument', '') if opt: cmd.append(opt) if arg: cmd.append(arg) # Output file output_filename = f"{job_id}_output.mp4" output_path = os.path.join(temp_dir, output_filename) cmd.append(output_path) # Execute FFmpeg command print(f"Executing FFmpeg command...") print(' '.join(cmd)) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return None, f"FFmpeg error: {result.stderr}", None # Check if output file exists if not os.path.exists(output_path): return None, "Output file was not created", None # Extract metadata if requested metadata = {} if job_data.get('metadata', {}).get('duration') or \ job_data.get('metadata', {}).get('filesize') or \ job_data.get('metadata', {}).get('bitrate') or \ job_data.get('metadata', {}).get('encoder'): metadata = extract_metadata(output_path) # Generate thumbnail if requested thumbnail_path = None if job_data.get('metadata', {}).get('thumbnail'): thumb_filename = f"{job_id}_thumbnail.jpg" thumb_path = os.path.join(temp_dir, thumb_filename) if generate_thumbnail(output_path, thumb_path): # Copy thumbnail to permanent location perm_thumb_path = os.path.join(".", thumb_filename) shutil.copy2(thumb_path, perm_thumb_path) thumbnail_path = perm_thumb_path # Copy output to permanent location permanent_output = os.path.join(".", output_filename) shutil.copy2(output_path, permanent_output) # Format metadata for display metadata_str = "" if metadata: metadata_str = f""" **Metadata:** - Duration: {metadata.get('duration', 0):.2f} seconds - File size: {metadata.get('filesize', 0) / (1024*1024):.2f} MB - Bitrate: {metadata.get('bitrate', 0) / 1000:.0f} kbps - Encoder: {metadata.get('encoder', 'Unknown')} """ return permanent_output, f"Processing complete!\n{metadata_str}", thumbnail_path except json.JSONDecodeError: return None, "Invalid JSON format", None except requests.RequestException as e: return None, f"API request error: {str(e)}", None except Exception as e: return None, f"Processing error: {str(e)}", None # Create Gradio interface def create_interface(): with gr.Blocks(title="FFmpeg Video Processor") as app: gr.Markdown("# FFmpeg Video Processor") gr.Markdown("Process videos using FFmpeg with JSON configuration from API or direct input.") with gr.Row(): with gr.Column(): api_url_input = gr.Textbox( label="API URL (optional)", placeholder="https://api.example.com/ffmpeg-job", lines=1 ) json_input = gr.Textbox( label="JSON Input (used if no API URL provided)", placeholder='{"inputs": [...], "filters": [...], "outputs": [...]}', lines=15, value=json.dumps({ "inputs": [ {"file_url": "https://example.com/video1.mp4"}, {"file_url": "https://example.com/video2.mp4"} ], "filters": [ {"filter": "[0:v][1:v]concat=n=2:v=1:a=0[outv]"} ], "outputs": [ { "options": [ {"option": "-map", "argument": "[outv]"}, {"option": "-c:v", "argument": "libx264"} ] } ], "metadata": { "thumbnail": True, "filesize": True, "duration": True }, "id": "example_job" }, indent=2) ) process_btn = gr.Button("Process Video", variant="primary") with gr.Column(): output_video = gr.Video(label="Processed Video") output_thumbnail = gr.Image(label="Thumbnail", visible=False) status_text = gr.Textbox(label="Status", lines=8) download_file = gr.File(label="Download Processed Video", visible=False) def process_and_update(api_url, json_str): output_path, status, thumbnail = process_ffmpeg_job(json_str, api_url) if output_path and os.path.exists(output_path): return ( output_path, # video status, # status text output_path, # download file gr.update(visible=True), # show download thumbnail, # thumbnail gr.update(visible=bool(thumbnail)) # show thumbnail if exists ) else: return ( None, # video status, # status text None, # download file gr.update(visible=False), # hide download None, # thumbnail gr.update(visible=False) # hide thumbnail ) process_btn.click( fn=process_and_update, inputs=[api_url_input, json_input], outputs=[output_video, status_text, download_file, download_file, output_thumbnail, output_thumbnail] ) gr.Markdown(""" ## Instructions: 1. Either provide an API URL that returns the JSON configuration, or paste the JSON directly 2. The JSON should contain: - `inputs`: Array of input files with `file_url` - `filters`: Array of FFmpeg filter strings - `outputs`: Array of output options - `metadata`: Optional metadata extraction settings - `id`: Job identifier 3. Click "Process Video" to start processing 4. The processed video will be displayed and available for download """) return app # Create and launch the app if __name__ == "__main__": app = create_interface() app.launch()