Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import subprocess | |
| import json | |
| import os | |
| import tempfile | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import re | |
| import shlex | |
| class FFmpegAgent: | |
| """AI-powered FFmpeg agent that generates and executes FFmpeg commands from natural language.""" | |
| def __init__(self): | |
| self.command_history = [] | |
| self.temp_dir = tempfile.mkdtemp() | |
| def parse_natural_language(self, text: str) -> Dict[str, Any]: | |
| """Parse natural language into FFmpeg command parameters.""" | |
| text = text.lower().strip() | |
| # Initialize parameters | |
| params = { | |
| 'input_file': None, | |
| 'output_file': None, | |
| 'operations': [], | |
| 'quality': 'medium', | |
| 'format': None, | |
| 'codec': None, | |
| 'resolution': None, | |
| 'fps': None, | |
| 'bitrate': None, | |
| 'duration': None, | |
| 'start_time': None, | |
| 'audio_ops': [] | |
| } | |
| # Extract file paths | |
| file_pattern = r'["\']([^"\']+\.(mp4|avi|mov|mkv|flv|webm|mp3|wav|flac|aac))["\']' | |
| files = re.findall(file_pattern, text) | |
| if files: | |
| params['input_file'] = files[0][0] | |
| if len(files) > 1: | |
| params['output_file'] = files[1][0] | |
| # Extract operations | |
| if any(word in text for word in ['resize', 'scale', 'resolution']): | |
| # Extract resolution | |
| res_pattern = r'(\d{3,4})x(\d{3,4})' | |
| res_match = re.search(res_pattern, text) | |
| if res_match: | |
| params['resolution'] = f"{res_match.group(1)}x{res_match.group(2)}" | |
| elif '720p' in text: | |
| params['resolution'] = "1280x720" | |
| elif '1080p' in text: | |
| params['resolution'] = "1920x1080" | |
| elif '4k' in text: | |
| params['resolution'] = "3840x2160" | |
| if any(word in text for word in ['compress', 'reduce', 'smaller']): | |
| params['operations'].append('compress') | |
| if 'high' in text: | |
| params['quality'] = 'high' | |
| elif 'low' in text: | |
| params['quality'] = 'low' | |
| if any(word in text for word in ['convert', 'format']): | |
| # Extract format | |
| format_pattern = r'\.(mp4|avi|mov|mkv|flv|webm|mp3|wav|flac|aac)' | |
| if params['output_file']: | |
| format_match = re.search(format_pattern, params['output_file']) | |
| if format_match: | |
| params['format'] = format_match.group(1) | |
| if 'fps' in text: | |
| fps_pattern = r'(\d+)\s*fps' | |
| fps_match = re.search(fps_pattern, text) | |
| if fps_match: | |
| params['fps'] = int(fps_match.group(1)) | |
| if 'trim' in text or 'cut' in text: | |
| params['operations'].append('trim') | |
| # Extract time patterns | |
| time_pattern = r'(\d{1,2}):(\d{2}):(\d{2})' | |
| times = re.findall(time_pattern, text) | |
| if len(times) >= 1: | |
| params['start_time'] = f"{times[0][0]}:{times[0][1]}:{times[0][2]}" | |
| if len(times) >= 2: | |
| duration = self._calculate_duration(times[0], times[1]) | |
| params['duration'] = duration | |
| if any(word in text for word in ['extract audio', 'audio only']): | |
| params['operations'].append('extract_audio') | |
| if 'mute' in text: | |
| params['audio_ops'].append('mute') | |
| if 'normalize' in text: | |
| params['audio_ops'].append('normalize') | |
| return params | |
| def _calculate_duration(self, start: Tuple, end: Tuple) -> str: | |
| """Calculate duration between two timestamps.""" | |
| start_seconds = int(start[0]) * 3600 + int(start[1]) * 60 + int(start[2]) | |
| end_seconds = int(end[0]) * 3600 + int(end[1]) * 60 + int(end[2]) | |
| duration_seconds = end_seconds - start_seconds | |
| hours = duration_seconds // 3600 | |
| minutes = (duration_seconds % 3600) // 60 | |
| seconds = duration_seconds % 60 | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d}" | |
| def generate_command(self, params: Dict[str, Any]) -> str: | |
| """Generate FFmpeg command from parameters.""" | |
| if not params['input_file']: | |
| return "Error: No input file specified" | |
| cmd = ['ffmpeg', '-i', shlex.quote(params['input_file'])] | |
| # Add start time if specified | |
| if params['start_time']: | |
| cmd.extend(['-ss', params['start_time']]) | |
| # Add duration if specified | |
| if params['duration']: | |
| cmd.extend(['-t', params['duration']]) | |
| # Video operations | |
| if params['resolution']: | |
| cmd.extend(['-vf', f"scale={params['resolution']}"]) | |
| if params['fps']: | |
| cmd.extend(['-r', str(params['fps'])]) | |
| # Quality settings | |
| if params['quality'] == 'high': | |
| cmd.extend(['-crf', '18']) | |
| elif params['quality'] == 'low': | |
| cmd.extend(['-crf', '28']) | |
| else: | |
| cmd.extend(['-crf', '23']) | |
| # Audio operations | |
| if 'mute' in params['audio_ops']: | |
| cmd.extend(['-an']) | |
| if 'normalize' in params['audio_ops']: | |
| cmd.extend(['-af', 'loudnorm=I=-16:LRA=11:TP=-1.5']) | |
| if params['operations'] == ['extract_audio']: | |
| cmd.extend(['-vn', '-acodec', 'copy']) | |
| # Output file | |
| if params['output_file']: | |
| cmd.append(shlex.quote(params['output_file'])) | |
| else: | |
| # Generate output filename | |
| input_name = os.path.splitext(params['input_file'])[0] | |
| output_ext = params['format'] or 'mp4' | |
| cmd.append(f"{input_name}_processed.{output_ext}") | |
| # Add overwrite flag | |
| cmd.append('-y') | |
| return ' '.join(cmd) | |
| def execute_command(self, command: str) -> Tuple[str, str]: | |
| """Execute FFmpeg command and return output.""" | |
| try: | |
| # Parse command safely | |
| cmd_args = shlex.split(command) | |
| # Execute command | |
| result = subprocess.run( | |
| cmd_args, | |
| capture_output=True, | |
| text=True, | |
| timeout=300 # 5 minute timeout | |
| ) | |
| # Store in history | |
| self.command_history.append({ | |
| 'command': command, | |
| 'stdout': result.stdout, | |
| 'stderr': result.stderr, | |
| 'returncode': result.returncode | |
| }) | |
| if result.returncode == 0: | |
| return "β Command executed successfully!", result.stdout | |
| else: | |
| return f"β Error (code {result.returncode})", result.stderr | |
| except subprocess.TimeoutExpired: | |
| return "β Error: Command timed out after 5 minutes", "" | |
| except Exception as e: | |
| return f"β Error: {str(e)}", "" | |
| def get_preset_commands(self) -> List[Dict[str, str]]: | |
| """Get list of preset commands with descriptions.""" | |
| return [ | |
| { | |
| "name": "Compress Video", | |
| "description": "Compress video for web sharing", | |
| "template": "Compress '{input}' to '{output}' with medium quality" | |
| }, | |
| { | |
| "name": "Resize to 1080p", | |
| "description": "Resize video to 1080p resolution", | |
| "template": "Resize '{input}' to 1920x1080 and save as '{output}'" | |
| }, | |
| { | |
| "name": "Extract Audio", | |
| "description": "Extract audio from video file", | |
| "template": "Extract audio from '{input}' and save as '{output}.mp3'" | |
| }, | |
| { | |
| "name": "Trim Video", | |
| "description": "Trim video between timestamps", | |
| "template": "Trim '{input}' from 00:00:10 to 00:00:30 and save as '{output}'" | |
| }, | |
| { | |
| "name": "Convert to MP4", | |
| "description": "Convert any video to MP4 format", | |
| "template": "Convert '{input}' to MP4 format and save as '{output}'" | |
| }, | |
| { | |
| "name": "Change FPS", | |
| "description": "Change video frame rate", | |
| "template": "Change '{input}' to 30 fps and save as '{output}'" | |
| } | |
| ] | |
| # Initialize the agent | |
| agent = FFmpegAgent() | |
| def process_request(user_input: str, input_file: Optional[str] = None, output_file: Optional[str] = None) -> Tuple[str, str, str]: | |
| """Process user request and generate/execute FFmpeg command.""" | |
| # If files are provided, update the input | |
| if input_file: | |
| user_input = user_input.replace("{input}", f'"{input_file}"') | |
| if output_file: | |
| user_input = user_input.replace("{output}", f'"{output_file}"') | |
| # Parse the natural language | |
| params = agent.parse_natural_language(user_input) | |
| # Generate command | |
| command = agent.generate_command(params) | |
| # Format the parsed parameters for display | |
| param_text = json.dumps(params, indent=2, default=str) | |
| return command, param_text, "" | |
| def execute_ffmpeg_command(command: str) -> Tuple[str, str]: | |
| """Execute the generated FFmpeg command.""" | |
| status, output = agent.execute_command(command) | |
| return status, output | |
| def load_preset(preset_name: str) -> str: | |
| """Load a preset command template.""" | |
| presets = agent.get_preset_commands() | |
| for preset in presets: | |
| if preset["name"] == preset_name: | |
| return preset["template"] | |
| return "" | |
| def get_command_history() -> str: | |
| """Get formatted command history.""" | |
| if not agent.command_history: | |
| return "No commands executed yet." | |
| history_text = [] | |
| for i, cmd in enumerate(agent.command_history[-10:], 1): | |
| status = "β " if cmd["returncode"] == 0 else "β" | |
| history_text.append(f"{i}. {status} {cmd['command']}") | |
| if cmd["stderr"]: | |
| history_text.append(f" Error: {cmd['stderr'][:100]}...") | |
| return "\n".join(history_text) | |
| # Create the Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# π¬ AI FFmpeg Agent") | |
| gr.Markdown("Generate and execute FFmpeg commands using natural language. [Built with anycoder](https://huggingface.co/spaces/akhaliq/anycoder)") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("## π Input") | |
| # User input | |
| user_input = gr.Textbox( | |
| label="Describe what you want to do with your media file", | |
| placeholder="e.g., Resize 'video.mp4' to 720p and compress it, or use the presets below", | |
| lines=3 | |
| ) | |
| # File inputs | |
| with gr.Row(): | |
| input_file = gr.File( | |
| label="Input File (Optional)", | |
| file_types=["video", "audio"] | |
| ) | |
| output_filename = gr.Textbox( | |
| label="Output Filename (Optional)", | |
| placeholder="output.mp4" | |
| ) | |
| # Preset buttons | |
| gr.Markdown("### π Quick Presets") | |
| with gr.Row(): | |
| preset_choices = [p["name"] for p in agent.get_preset_commands()] | |
| preset_dropdown = gr.Dropdown( | |
| choices=preset_choices, | |
| label="Select Preset", | |
| value=None | |
| ) | |
| # Generate button | |
| generate_btn = gr.Button("π§ Generate Command", variant="primary") | |
| with gr.Column(scale=1): | |
| gr.Markdown("## βοΈ Parsed Parameters") | |
| params_display = gr.Code( | |
| label="Parsed Parameters", | |
| language="json", | |
| lines=10 | |
| ) | |
| # Generated command section | |
| gr.Markdown("## π― Generated Command") | |
| command_output = gr.Code( | |
| label="FFmpeg Command", | |
| language="bash", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| execute_btn = gr.Button("βΆοΈ Execute Command", variant="primary") | |
| clear_btn = gr.Button("ποΈ Clear") | |
| # Execution results | |
| gr.Markdown("## π Execution Results") | |
| with gr.Row(): | |
| with gr.Column(): | |
| status_output = gr.Textbox( | |
| label="Status", | |
| interactive=False | |
| ) | |
| with gr.Column(): | |
| result_output = gr.Code( | |
| label="Output", | |
| language="text", | |
| lines=10 | |
| ) | |
| # Command history | |
| with gr.Accordion("π Command History", open=False): | |
| history_output = gr.Code( | |
| label="Recent Commands", | |
| language="text", | |
| lines=8, | |
| value=get_command_history() | |
| ) | |
| refresh_history_btn = gr.Button("π Refresh History") | |
| # Examples | |
| gr.Markdown("## π‘ Examples") | |
| gr.Examples( | |
| examples=[ | |
| ["Resize 'video.mp4' to 720p and compress it"], | |
| ["Extract audio from 'movie.mkv' and save as 'audio.mp3'"], | |
| ["Convert 'video.avi' to MP4 format with high quality"], | |
| ["Trim 'clip.mp4' from 00:00:10 to 00:00:30"], | |
| ["Change 'video.mov' to 30 fps and save as 'output.mp4'"], | |
| ["Compress 'large_video.mp4' for web sharing"], | |
| ["Mute the audio in 'video_with_sound.mp4'"] | |
| ], | |
| inputs=[user_input], | |
| label="Try these examples:" | |
| ) | |
| # Event handlers | |
| def handle_preset_change(preset_name): | |
| return load_preset(preset_name) if preset_name else "" | |
| preset_dropdown.change( | |
| handle_preset_change, | |
| inputs=[preset_dropdown], | |
| outputs=[user_input] | |
| ) | |
| generate_btn.click( | |
| process_request, | |
| inputs=[user_input, input_file, output_filename], | |
| outputs=[command_output, params_display, status_output] | |
| ) | |
| execute_btn.click( | |
| execute_ffmpeg_command, | |
| inputs=[command_output], | |
| outputs=[status_output, result_output] | |
| ) | |
| clear_btn.click( | |
| lambda: ("", "", "", "", ""), | |
| outputs=[user_input, command_output, params_display, status_output, result_output] | |
| ) | |
| refresh_history_btn.click( | |
| get_command_history, | |
| outputs=[history_output] | |
| ) | |
| # Auto-refresh history when a command is executed | |
| def execute_and_refresh(command): | |
| status, output = execute_ffmpeg_command(command) | |
| return status, output, get_command_history() | |
| execute_btn.click( | |
| execute_and_refresh, | |
| inputs=[command_output], | |
| outputs=[status_output, result_output, history_output], | |
| show_progress="full" | |
| ) | |
| demo.launch( | |
| theme=gr.themes.Soft( | |
| primary_hue="blue", | |
| secondary_hue="indigo", | |
| neutral_hue="slate", | |
| font=gr.themes.GoogleFont("Inter"), | |
| text_size="lg", | |
| spacing_size="lg", | |
| radius_size="md" | |
| ), | |
| footer_links=[ | |
| {"label": "Built with anycoder", "url": "https://huggingface.co/spaces/akhaliq/anycoder"}, | |
| {"label": "FFmpeg Documentation", "url": "https://ffmpeg.org/documentation.html"} | |
| ] | |
| ) |