exploreerx / app.py
hivecorp's picture
Create app.py
fad923e verified
import gradio as gr
import requests
import json
import subprocess
import os
import tempfile
import shutil
from urllib.parse import urlparse
import time
def download_file(url, dest_path):
"""Download a file from URL to destination path."""
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(dest_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
print(f"Error downloading {url}: {str(e)}")
return False
def extract_metadata(video_path):
"""Extract metadata from video file using ffprobe."""
try:
cmd = [
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', '-show_streams', video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
data = json.loads(result.stdout)
format_info = data.get('format', {})
metadata = {
'duration': float(format_info.get('duration', 0)),
'filesize': int(format_info.get('size', 0)),
'bitrate': int(format_info.get('bit_rate', 0)),
'encoder': format_info.get('tags', {}).get('encoder', 'Unknown')
}
return metadata
except Exception as e:
print(f"Error extracting metadata: {str(e)}")
return {}
def generate_thumbnail(video_path, thumbnail_path):
"""Generate thumbnail from video."""
try:
cmd = [
'ffmpeg', '-i', video_path, '-ss', '00:00:01',
'-vframes', '1', '-q:v', '2', thumbnail_path, '-y'
]
subprocess.run(cmd, capture_output=True)
return os.path.exists(thumbnail_path)
except:
return False
def process_ffmpeg_job(json_input, api_url=None):
"""Process FFmpeg job from JSON input or API URL."""
try:
# Parse JSON input
if api_url and api_url.strip():
response = requests.get(api_url.strip(), timeout=30)
response.raise_for_status()
job_data = response.json()
else:
job_data = json.loads(json_input)
job_id = job_data.get('id', 'output')
# Create temporary directory for processing
with tempfile.TemporaryDirectory() as temp_dir:
# Download all input files
input_files = []
for i, input_item in enumerate(job_data.get('inputs', [])):
file_url = input_item.get('file_url')
if not file_url:
continue
# Determine file extension
parsed_url = urlparse(file_url)
filename = os.path.basename(parsed_url.path)
if not filename:
filename = f"input_{i}"
dest_path = os.path.join(temp_dir, f"{i}_{filename}")
print(f"Downloading {file_url}...")
if download_file(file_url, dest_path):
input_files.append(dest_path)
else:
return None, f"Failed to download: {file_url}", None
if not input_files:
return None, "No input files to process", None
# Build FFmpeg command
cmd = ['ffmpeg']
# Add input files
for input_file in input_files:
cmd.extend(['-i', input_file])
# Add filters
filter_complex = []
for filter_item in job_data.get('filters', []):
filter_str = filter_item.get('filter', '')
if filter_str:
filter_complex.append(filter_str)
if filter_complex:
cmd.extend(['-filter_complex', ';'.join(filter_complex)])
# Add output options
for output in job_data.get('outputs', []):
for option in output.get('options', []):
opt = option.get('option', '')
arg = option.get('argument', '')
if opt:
cmd.append(opt)
if arg:
cmd.append(arg)
# Output file
output_filename = f"{job_id}_output.mp4"
output_path = os.path.join(temp_dir, output_filename)
cmd.append(output_path)
# Execute FFmpeg command
print(f"Executing FFmpeg command...")
print(' '.join(cmd))
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return None, f"FFmpeg error: {result.stderr}", None
# Check if output file exists
if not os.path.exists(output_path):
return None, "Output file was not created", None
# Extract metadata if requested
metadata = {}
if job_data.get('metadata', {}).get('duration') or \
job_data.get('metadata', {}).get('filesize') or \
job_data.get('metadata', {}).get('bitrate') or \
job_data.get('metadata', {}).get('encoder'):
metadata = extract_metadata(output_path)
# Generate thumbnail if requested
thumbnail_path = None
if job_data.get('metadata', {}).get('thumbnail'):
thumb_filename = f"{job_id}_thumbnail.jpg"
thumb_path = os.path.join(temp_dir, thumb_filename)
if generate_thumbnail(output_path, thumb_path):
# Copy thumbnail to permanent location
perm_thumb_path = os.path.join(".", thumb_filename)
shutil.copy2(thumb_path, perm_thumb_path)
thumbnail_path = perm_thumb_path
# Copy output to permanent location
permanent_output = os.path.join(".", output_filename)
shutil.copy2(output_path, permanent_output)
# Format metadata for display
metadata_str = ""
if metadata:
metadata_str = f"""
**Metadata:**
- Duration: {metadata.get('duration', 0):.2f} seconds
- File size: {metadata.get('filesize', 0) / (1024*1024):.2f} MB
- Bitrate: {metadata.get('bitrate', 0) / 1000:.0f} kbps
- Encoder: {metadata.get('encoder', 'Unknown')}
"""
return permanent_output, f"Processing complete!\n{metadata_str}", thumbnail_path
except json.JSONDecodeError:
return None, "Invalid JSON format", None
except requests.RequestException as e:
return None, f"API request error: {str(e)}", None
except Exception as e:
return None, f"Processing error: {str(e)}", None
# Create Gradio interface
def create_interface():
with gr.Blocks(title="FFmpeg Video Processor") as app:
gr.Markdown("# FFmpeg Video Processor")
gr.Markdown("Process videos using FFmpeg with JSON configuration from API or direct input.")
with gr.Row():
with gr.Column():
api_url_input = gr.Textbox(
label="API URL (optional)",
placeholder="https://api.example.com/ffmpeg-job",
lines=1
)
json_input = gr.Textbox(
label="JSON Input (used if no API URL provided)",
placeholder='{"inputs": [...], "filters": [...], "outputs": [...]}',
lines=15,
value=json.dumps({
"inputs": [
{"file_url": "https://example.com/video1.mp4"},
{"file_url": "https://example.com/video2.mp4"}
],
"filters": [
{"filter": "[0:v][1:v]concat=n=2:v=1:a=0[outv]"}
],
"outputs": [
{
"options": [
{"option": "-map", "argument": "[outv]"},
{"option": "-c:v", "argument": "libx264"}
]
}
],
"metadata": {
"thumbnail": True,
"filesize": True,
"duration": True
},
"id": "example_job"
}, indent=2)
)
process_btn = gr.Button("Process Video", variant="primary")
with gr.Column():
output_video = gr.Video(label="Processed Video")
output_thumbnail = gr.Image(label="Thumbnail", visible=False)
status_text = gr.Textbox(label="Status", lines=8)
download_file = gr.File(label="Download Processed Video", visible=False)
def process_and_update(api_url, json_str):
output_path, status, thumbnail = process_ffmpeg_job(json_str, api_url)
if output_path and os.path.exists(output_path):
return (
output_path, # video
status, # status text
output_path, # download file
gr.update(visible=True), # show download
thumbnail, # thumbnail
gr.update(visible=bool(thumbnail)) # show thumbnail if exists
)
else:
return (
None, # video
status, # status text
None, # download file
gr.update(visible=False), # hide download
None, # thumbnail
gr.update(visible=False) # hide thumbnail
)
process_btn.click(
fn=process_and_update,
inputs=[api_url_input, json_input],
outputs=[output_video, status_text, download_file, download_file, output_thumbnail, output_thumbnail]
)
gr.Markdown("""
## Instructions:
1. Either provide an API URL that returns the JSON configuration, or paste the JSON directly
2. The JSON should contain:
- `inputs`: Array of input files with `file_url`
- `filters`: Array of FFmpeg filter strings
- `outputs`: Array of output options
- `metadata`: Optional metadata extraction settings
- `id`: Job identifier
3. Click "Process Video" to start processing
4. The processed video will be displayed and available for download
""")
return app
# Create and launch the app
if __name__ == "__main__":
app = create_interface()
app.launch()