import gradio as gr import subprocess import json import os import tempfile from typing import List, Dict, Any, Optional, Tuple import re import shlex class FFmpegAgent: """AI-powered FFmpeg agent that generates and executes FFmpeg commands from natural language.""" def __init__(self): self.command_history = [] self.temp_dir = tempfile.mkdtemp() def parse_natural_language(self, text: str) -> Dict[str, Any]: """Parse natural language into FFmpeg command parameters.""" text = text.lower().strip() # Initialize parameters params = { 'input_file': None, 'output_file': None, 'operations': [], 'quality': 'medium', 'format': None, 'codec': None, 'resolution': None, 'fps': None, 'bitrate': None, 'duration': None, 'start_time': None, 'audio_ops': [] } # Extract file paths file_pattern = r'["\']([^"\']+\.(mp4|avi|mov|mkv|flv|webm|mp3|wav|flac|aac))["\']' files = re.findall(file_pattern, text) if files: params['input_file'] = files[0][0] if len(files) > 1: params['output_file'] = files[1][0] # Extract operations if any(word in text for word in ['resize', 'scale', 'resolution']): # Extract resolution res_pattern = r'(\d{3,4})x(\d{3,4})' res_match = re.search(res_pattern, text) if res_match: params['resolution'] = f"{res_match.group(1)}x{res_match.group(2)}" elif '720p' in text: params['resolution'] = "1280x720" elif '1080p' in text: params['resolution'] = "1920x1080" elif '4k' in text: params['resolution'] = "3840x2160" if any(word in text for word in ['compress', 'reduce', 'smaller']): params['operations'].append('compress') if 'high' in text: params['quality'] = 'high' elif 'low' in text: params['quality'] = 'low' if any(word in text for word in ['convert', 'format']): # Extract format format_pattern = r'\.(mp4|avi|mov|mkv|flv|webm|mp3|wav|flac|aac)' if params['output_file']: format_match = re.search(format_pattern, params['output_file']) if format_match: params['format'] = format_match.group(1) if 'fps' in text: fps_pattern = r'(\d+)\s*fps' fps_match = re.search(fps_pattern, text) if fps_match: params['fps'] = int(fps_match.group(1)) if 'trim' in text or 'cut' in text: params['operations'].append('trim') # Extract time patterns time_pattern = r'(\d{1,2}):(\d{2}):(\d{2})' times = re.findall(time_pattern, text) if len(times) >= 1: params['start_time'] = f"{times[0][0]}:{times[0][1]}:{times[0][2]}" if len(times) >= 2: duration = self._calculate_duration(times[0], times[1]) params['duration'] = duration if any(word in text for word in ['extract audio', 'audio only']): params['operations'].append('extract_audio') if 'mute' in text: params['audio_ops'].append('mute') if 'normalize' in text: params['audio_ops'].append('normalize') return params def _calculate_duration(self, start: Tuple, end: Tuple) -> str: """Calculate duration between two timestamps.""" start_seconds = int(start[0]) * 3600 + int(start[1]) * 60 + int(start[2]) end_seconds = int(end[0]) * 3600 + int(end[1]) * 60 + int(end[2]) duration_seconds = end_seconds - start_seconds hours = duration_seconds // 3600 minutes = (duration_seconds % 3600) // 60 seconds = duration_seconds % 60 return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def generate_command(self, params: Dict[str, Any]) -> str: """Generate FFmpeg command from parameters.""" if not params['input_file']: return "Error: No input file specified" cmd = ['ffmpeg', '-i', shlex.quote(params['input_file'])] # Add start time if specified if params['start_time']: cmd.extend(['-ss', params['start_time']]) # Add duration if specified if params['duration']: cmd.extend(['-t', params['duration']]) # Video operations if params['resolution']: cmd.extend(['-vf', f"scale={params['resolution']}"]) if params['fps']: cmd.extend(['-r', str(params['fps'])]) # Quality settings if params['quality'] == 'high': cmd.extend(['-crf', '18']) elif params['quality'] == 'low': cmd.extend(['-crf', '28']) else: cmd.extend(['-crf', '23']) # Audio operations if 'mute' in params['audio_ops']: cmd.extend(['-an']) if 'normalize' in params['audio_ops']: cmd.extend(['-af', 'loudnorm=I=-16:LRA=11:TP=-1.5']) if params['operations'] == ['extract_audio']: cmd.extend(['-vn', '-acodec', 'copy']) # Output file if params['output_file']: cmd.append(shlex.quote(params['output_file'])) else: # Generate output filename input_name = os.path.splitext(params['input_file'])[0] output_ext = params['format'] or 'mp4' cmd.append(f"{input_name}_processed.{output_ext}") # Add overwrite flag cmd.append('-y') return ' '.join(cmd) def execute_command(self, command: str) -> Tuple[str, str]: """Execute FFmpeg command and return output.""" try: # Parse command safely cmd_args = shlex.split(command) # Execute command result = subprocess.run( cmd_args, capture_output=True, text=True, timeout=300 # 5 minute timeout ) # Store in history self.command_history.append({ 'command': command, 'stdout': result.stdout, 'stderr': result.stderr, 'returncode': result.returncode }) if result.returncode == 0: return "✅ Command executed successfully!", result.stdout else: return f"❌ Error (code {result.returncode})", result.stderr except subprocess.TimeoutExpired: return "❌ Error: Command timed out after 5 minutes", "" except Exception as e: return f"❌ Error: {str(e)}", "" def get_preset_commands(self) -> List[Dict[str, str]]: """Get list of preset commands with descriptions.""" return [ { "name": "Compress Video", "description": "Compress video for web sharing", "template": "Compress '{input}' to '{output}' with medium quality" }, { "name": "Resize to 1080p", "description": "Resize video to 1080p resolution", "template": "Resize '{input}' to 1920x1080 and save as '{output}'" }, { "name": "Extract Audio", "description": "Extract audio from video file", "template": "Extract audio from '{input}' and save as '{output}.mp3'" }, { "name": "Trim Video", "description": "Trim video between timestamps", "template": "Trim '{input}' from 00:00:10 to 00:00:30 and save as '{output}'" }, { "name": "Convert to MP4", "description": "Convert any video to MP4 format", "template": "Convert '{input}' to MP4 format and save as '{output}'" }, { "name": "Change FPS", "description": "Change video frame rate", "template": "Change '{input}' to 30 fps and save as '{output}'" } ] # Initialize the agent agent = FFmpegAgent() def process_request(user_input: str, input_file: Optional[str] = None, output_file: Optional[str] = None) -> Tuple[str, str, str]: """Process user request and generate/execute FFmpeg command.""" # If files are provided, update the input if input_file: user_input = user_input.replace("{input}", f'"{input_file}"') if output_file: user_input = user_input.replace("{output}", f'"{output_file}"') # Parse the natural language params = agent.parse_natural_language(user_input) # Generate command command = agent.generate_command(params) # Format the parsed parameters for display param_text = json.dumps(params, indent=2, default=str) return command, param_text, "" def execute_ffmpeg_command(command: str) -> Tuple[str, str]: """Execute the generated FFmpeg command.""" status, output = agent.execute_command(command) return status, output def load_preset(preset_name: str) -> str: """Load a preset command template.""" presets = agent.get_preset_commands() for preset in presets: if preset["name"] == preset_name: return preset["template"] return "" def get_command_history() -> str: """Get formatted command history.""" if not agent.command_history: return "No commands executed yet." history_text = [] for i, cmd in enumerate(agent.command_history[-10:], 1): status = "✅" if cmd["returncode"] == 0 else "❌" history_text.append(f"{i}. {status} {cmd['command']}") if cmd["stderr"]: history_text.append(f" Error: {cmd['stderr'][:100]}...") return "\n".join(history_text) # Create the Gradio interface with gr.Blocks() as demo: gr.Markdown("# 🎬 AI FFmpeg Agent") gr.Markdown("Generate and execute FFmpeg commands using natural language. [Built with anycoder](https://huggingface.co/spaces/akhaliq/anycoder)") with gr.Row(): with gr.Column(scale=2): gr.Markdown("## 📝 Input") # User input user_input = gr.Textbox( label="Describe what you want to do with your media file", placeholder="e.g., Resize 'video.mp4' to 720p and compress it, or use the presets below", lines=3 ) # File inputs with gr.Row(): input_file = gr.File( label="Input File (Optional)", file_types=["video", "audio"] ) output_filename = gr.Textbox( label="Output Filename (Optional)", placeholder="output.mp4" ) # Preset buttons gr.Markdown("### 🚀 Quick Presets") with gr.Row(): preset_choices = [p["name"] for p in agent.get_preset_commands()] preset_dropdown = gr.Dropdown( choices=preset_choices, label="Select Preset", value=None ) # Generate button generate_btn = gr.Button("🔧 Generate Command", variant="primary") with gr.Column(scale=1): gr.Markdown("## ⚙️ Parsed Parameters") params_display = gr.Code( label="Parsed Parameters", language="json", lines=10 ) # Generated command section gr.Markdown("## 🎯 Generated Command") command_output = gr.Code( label="FFmpeg Command", language="bash", lines=3 ) with gr.Row(): execute_btn = gr.Button("▶️ Execute Command", variant="primary") clear_btn = gr.Button("🗑️ Clear") # Execution results gr.Markdown("## 📊 Execution Results") with gr.Row(): with gr.Column(): status_output = gr.Textbox( label="Status", interactive=False ) with gr.Column(): result_output = gr.Code( label="Output", language="text", lines=10 ) # Command history with gr.Accordion("📚 Command History", open=False): history_output = gr.Code( label="Recent Commands", language="text", lines=8, value=get_command_history() ) refresh_history_btn = gr.Button("🔄 Refresh History") # Examples gr.Markdown("## 💡 Examples") gr.Examples( examples=[ ["Resize 'video.mp4' to 720p and compress it"], ["Extract audio from 'movie.mkv' and save as 'audio.mp3'"], ["Convert 'video.avi' to MP4 format with high quality"], ["Trim 'clip.mp4' from 00:00:10 to 00:00:30"], ["Change 'video.mov' to 30 fps and save as 'output.mp4'"], ["Compress 'large_video.mp4' for web sharing"], ["Mute the audio in 'video_with_sound.mp4'"] ], inputs=[user_input], label="Try these examples:" ) # Event handlers def handle_preset_change(preset_name): return load_preset(preset_name) if preset_name else "" preset_dropdown.change( handle_preset_change, inputs=[preset_dropdown], outputs=[user_input] ) generate_btn.click( process_request, inputs=[user_input, input_file, output_filename], outputs=[command_output, params_display, status_output] ) execute_btn.click( execute_ffmpeg_command, inputs=[command_output], outputs=[status_output, result_output] ) clear_btn.click( lambda: ("", "", "", "", ""), outputs=[user_input, command_output, params_display, status_output, result_output] ) refresh_history_btn.click( get_command_history, outputs=[history_output] ) # Auto-refresh history when a command is executed def execute_and_refresh(command): status, output = execute_ffmpeg_command(command) return status, output, get_command_history() execute_btn.click( execute_and_refresh, inputs=[command_output], outputs=[status_output, result_output, history_output], show_progress="full" ) demo.launch( theme=gr.themes.Soft( primary_hue="blue", secondary_hue="indigo", neutral_hue="slate", font=gr.themes.GoogleFont("Inter"), text_size="lg", spacing_size="lg", radius_size="md" ), footer_links=[ {"label": "Built with anycoder", "url": "https://huggingface.co/spaces/akhaliq/anycoder"}, {"label": "FFmpeg Documentation", "url": "https://ffmpeg.org/documentation.html"} ] )