Samuelblue's picture
Upload folder using huggingface_hub
d750341 verified
import gradio as gr
import subprocess
import json
import os
import tempfile
from typing import List, Dict, Any, Optional, Tuple
import re
import shlex
class FFmpegAgent:
"""AI-powered FFmpeg agent that generates and executes FFmpeg commands from natural language."""
def __init__(self):
self.command_history = []
self.temp_dir = tempfile.mkdtemp()
def parse_natural_language(self, text: str) -> Dict[str, Any]:
"""Parse natural language into FFmpeg command parameters."""
text = text.lower().strip()
# Initialize parameters
params = {
'input_file': None,
'output_file': None,
'operations': [],
'quality': 'medium',
'format': None,
'codec': None,
'resolution': None,
'fps': None,
'bitrate': None,
'duration': None,
'start_time': None,
'audio_ops': []
}
# Extract file paths
file_pattern = r'["\']([^"\']+\.(mp4|avi|mov|mkv|flv|webm|mp3|wav|flac|aac))["\']'
files = re.findall(file_pattern, text)
if files:
params['input_file'] = files[0][0]
if len(files) > 1:
params['output_file'] = files[1][0]
# Extract operations
if any(word in text for word in ['resize', 'scale', 'resolution']):
# Extract resolution
res_pattern = r'(\d{3,4})x(\d{3,4})'
res_match = re.search(res_pattern, text)
if res_match:
params['resolution'] = f"{res_match.group(1)}x{res_match.group(2)}"
elif '720p' in text:
params['resolution'] = "1280x720"
elif '1080p' in text:
params['resolution'] = "1920x1080"
elif '4k' in text:
params['resolution'] = "3840x2160"
if any(word in text for word in ['compress', 'reduce', 'smaller']):
params['operations'].append('compress')
if 'high' in text:
params['quality'] = 'high'
elif 'low' in text:
params['quality'] = 'low'
if any(word in text for word in ['convert', 'format']):
# Extract format
format_pattern = r'\.(mp4|avi|mov|mkv|flv|webm|mp3|wav|flac|aac)'
if params['output_file']:
format_match = re.search(format_pattern, params['output_file'])
if format_match:
params['format'] = format_match.group(1)
if 'fps' in text:
fps_pattern = r'(\d+)\s*fps'
fps_match = re.search(fps_pattern, text)
if fps_match:
params['fps'] = int(fps_match.group(1))
if 'trim' in text or 'cut' in text:
params['operations'].append('trim')
# Extract time patterns
time_pattern = r'(\d{1,2}):(\d{2}):(\d{2})'
times = re.findall(time_pattern, text)
if len(times) >= 1:
params['start_time'] = f"{times[0][0]}:{times[0][1]}:{times[0][2]}"
if len(times) >= 2:
duration = self._calculate_duration(times[0], times[1])
params['duration'] = duration
if any(word in text for word in ['extract audio', 'audio only']):
params['operations'].append('extract_audio')
if 'mute' in text:
params['audio_ops'].append('mute')
if 'normalize' in text:
params['audio_ops'].append('normalize')
return params
def _calculate_duration(self, start: Tuple, end: Tuple) -> str:
"""Calculate duration between two timestamps."""
start_seconds = int(start[0]) * 3600 + int(start[1]) * 60 + int(start[2])
end_seconds = int(end[0]) * 3600 + int(end[1]) * 60 + int(end[2])
duration_seconds = end_seconds - start_seconds
hours = duration_seconds // 3600
minutes = (duration_seconds % 3600) // 60
seconds = duration_seconds % 60
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def generate_command(self, params: Dict[str, Any]) -> str:
"""Generate FFmpeg command from parameters."""
if not params['input_file']:
return "Error: No input file specified"
cmd = ['ffmpeg', '-i', shlex.quote(params['input_file'])]
# Add start time if specified
if params['start_time']:
cmd.extend(['-ss', params['start_time']])
# Add duration if specified
if params['duration']:
cmd.extend(['-t', params['duration']])
# Video operations
if params['resolution']:
cmd.extend(['-vf', f"scale={params['resolution']}"])
if params['fps']:
cmd.extend(['-r', str(params['fps'])])
# Quality settings
if params['quality'] == 'high':
cmd.extend(['-crf', '18'])
elif params['quality'] == 'low':
cmd.extend(['-crf', '28'])
else:
cmd.extend(['-crf', '23'])
# Audio operations
if 'mute' in params['audio_ops']:
cmd.extend(['-an'])
if 'normalize' in params['audio_ops']:
cmd.extend(['-af', 'loudnorm=I=-16:LRA=11:TP=-1.5'])
if params['operations'] == ['extract_audio']:
cmd.extend(['-vn', '-acodec', 'copy'])
# Output file
if params['output_file']:
cmd.append(shlex.quote(params['output_file']))
else:
# Generate output filename
input_name = os.path.splitext(params['input_file'])[0]
output_ext = params['format'] or 'mp4'
cmd.append(f"{input_name}_processed.{output_ext}")
# Add overwrite flag
cmd.append('-y')
return ' '.join(cmd)
def execute_command(self, command: str) -> Tuple[str, str]:
"""Execute FFmpeg command and return output."""
try:
# Parse command safely
cmd_args = shlex.split(command)
# Execute command
result = subprocess.run(
cmd_args,
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
# Store in history
self.command_history.append({
'command': command,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
})
if result.returncode == 0:
return "βœ… Command executed successfully!", result.stdout
else:
return f"❌ Error (code {result.returncode})", result.stderr
except subprocess.TimeoutExpired:
return "❌ Error: Command timed out after 5 minutes", ""
except Exception as e:
return f"❌ Error: {str(e)}", ""
def get_preset_commands(self) -> List[Dict[str, str]]:
"""Get list of preset commands with descriptions."""
return [
{
"name": "Compress Video",
"description": "Compress video for web sharing",
"template": "Compress '{input}' to '{output}' with medium quality"
},
{
"name": "Resize to 1080p",
"description": "Resize video to 1080p resolution",
"template": "Resize '{input}' to 1920x1080 and save as '{output}'"
},
{
"name": "Extract Audio",
"description": "Extract audio from video file",
"template": "Extract audio from '{input}' and save as '{output}.mp3'"
},
{
"name": "Trim Video",
"description": "Trim video between timestamps",
"template": "Trim '{input}' from 00:00:10 to 00:00:30 and save as '{output}'"
},
{
"name": "Convert to MP4",
"description": "Convert any video to MP4 format",
"template": "Convert '{input}' to MP4 format and save as '{output}'"
},
{
"name": "Change FPS",
"description": "Change video frame rate",
"template": "Change '{input}' to 30 fps and save as '{output}'"
}
]
# Initialize the agent
agent = FFmpegAgent()
def process_request(user_input: str, input_file: Optional[str] = None, output_file: Optional[str] = None) -> Tuple[str, str, str]:
"""Process user request and generate/execute FFmpeg command."""
# If files are provided, update the input
if input_file:
user_input = user_input.replace("{input}", f'"{input_file}"')
if output_file:
user_input = user_input.replace("{output}", f'"{output_file}"')
# Parse the natural language
params = agent.parse_natural_language(user_input)
# Generate command
command = agent.generate_command(params)
# Format the parsed parameters for display
param_text = json.dumps(params, indent=2, default=str)
return command, param_text, ""
def execute_ffmpeg_command(command: str) -> Tuple[str, str]:
"""Execute the generated FFmpeg command."""
status, output = agent.execute_command(command)
return status, output
def load_preset(preset_name: str) -> str:
"""Load a preset command template."""
presets = agent.get_preset_commands()
for preset in presets:
if preset["name"] == preset_name:
return preset["template"]
return ""
def get_command_history() -> str:
"""Get formatted command history."""
if not agent.command_history:
return "No commands executed yet."
history_text = []
for i, cmd in enumerate(agent.command_history[-10:], 1):
status = "βœ…" if cmd["returncode"] == 0 else "❌"
history_text.append(f"{i}. {status} {cmd['command']}")
if cmd["stderr"]:
history_text.append(f" Error: {cmd['stderr'][:100]}...")
return "\n".join(history_text)
# Create the Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# 🎬 AI FFmpeg Agent")
gr.Markdown("Generate and execute FFmpeg commands using natural language. [Built with anycoder](https://huggingface.co/spaces/akhaliq/anycoder)")
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("## πŸ“ Input")
# User input
user_input = gr.Textbox(
label="Describe what you want to do with your media file",
placeholder="e.g., Resize 'video.mp4' to 720p and compress it, or use the presets below",
lines=3
)
# File inputs
with gr.Row():
input_file = gr.File(
label="Input File (Optional)",
file_types=["video", "audio"]
)
output_filename = gr.Textbox(
label="Output Filename (Optional)",
placeholder="output.mp4"
)
# Preset buttons
gr.Markdown("### πŸš€ Quick Presets")
with gr.Row():
preset_choices = [p["name"] for p in agent.get_preset_commands()]
preset_dropdown = gr.Dropdown(
choices=preset_choices,
label="Select Preset",
value=None
)
# Generate button
generate_btn = gr.Button("πŸ”§ Generate Command", variant="primary")
with gr.Column(scale=1):
gr.Markdown("## βš™οΈ Parsed Parameters")
params_display = gr.Code(
label="Parsed Parameters",
language="json",
lines=10
)
# Generated command section
gr.Markdown("## 🎯 Generated Command")
command_output = gr.Code(
label="FFmpeg Command",
language="bash",
lines=3
)
with gr.Row():
execute_btn = gr.Button("▢️ Execute Command", variant="primary")
clear_btn = gr.Button("πŸ—‘οΈ Clear")
# Execution results
gr.Markdown("## πŸ“Š Execution Results")
with gr.Row():
with gr.Column():
status_output = gr.Textbox(
label="Status",
interactive=False
)
with gr.Column():
result_output = gr.Code(
label="Output",
language="text",
lines=10
)
# Command history
with gr.Accordion("πŸ“š Command History", open=False):
history_output = gr.Code(
label="Recent Commands",
language="text",
lines=8,
value=get_command_history()
)
refresh_history_btn = gr.Button("πŸ”„ Refresh History")
# Examples
gr.Markdown("## πŸ’‘ Examples")
gr.Examples(
examples=[
["Resize 'video.mp4' to 720p and compress it"],
["Extract audio from 'movie.mkv' and save as 'audio.mp3'"],
["Convert 'video.avi' to MP4 format with high quality"],
["Trim 'clip.mp4' from 00:00:10 to 00:00:30"],
["Change 'video.mov' to 30 fps and save as 'output.mp4'"],
["Compress 'large_video.mp4' for web sharing"],
["Mute the audio in 'video_with_sound.mp4'"]
],
inputs=[user_input],
label="Try these examples:"
)
# Event handlers
def handle_preset_change(preset_name):
return load_preset(preset_name) if preset_name else ""
preset_dropdown.change(
handle_preset_change,
inputs=[preset_dropdown],
outputs=[user_input]
)
generate_btn.click(
process_request,
inputs=[user_input, input_file, output_filename],
outputs=[command_output, params_display, status_output]
)
execute_btn.click(
execute_ffmpeg_command,
inputs=[command_output],
outputs=[status_output, result_output]
)
clear_btn.click(
lambda: ("", "", "", "", ""),
outputs=[user_input, command_output, params_display, status_output, result_output]
)
refresh_history_btn.click(
get_command_history,
outputs=[history_output]
)
# Auto-refresh history when a command is executed
def execute_and_refresh(command):
status, output = execute_ffmpeg_command(command)
return status, output, get_command_history()
execute_btn.click(
execute_and_refresh,
inputs=[command_output],
outputs=[status_output, result_output, history_output],
show_progress="full"
)
demo.launch(
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="indigo",
neutral_hue="slate",
font=gr.themes.GoogleFont("Inter"),
text_size="lg",
spacing_size="lg",
radius_size="md"
),
footer_links=[
{"label": "Built with anycoder", "url": "https://huggingface.co/spaces/akhaliq/anycoder"},
{"label": "FFmpeg Documentation", "url": "https://ffmpeg.org/documentation.html"}
]
)