dev_caio / app.py
Chaitanya-aitf's picture
Update app.py
0ef3eb8 verified
"""
ShortSmith v2 - Gradio Application
Hugging Face Space interface for video highlight extraction.
Features:
- Multi-modal analysis (visual + audio + motion)
- Domain-optimized presets
- Person-specific filtering (optional)
- Scene-aware clip cutting
- Batch testing with parameter variations
"""
import os
import sys
import tempfile
import shutil
import json
import zipfile
from pathlib import Path
import time
import traceback
from typing import List, Dict, Any, Optional
import gradio as gr
import pandas as pd
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
# Initialize logging
try:
from utils.logger import setup_logging, get_logger
setup_logging(log_level="INFO", log_to_console=True)
logger = get_logger("app")
except Exception:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("app")
# =============================================================================
# Shared Utilities
# =============================================================================
def build_metrics_output(result, domain: str, custom_prompt: Optional[str] = None) -> str:
"""
Build formatted metrics output for testing and evaluation.
Args:
result: PipelineResult object
domain: Content domain used for processing
custom_prompt: Custom prompt used (if any)
Returns:
Formatted string with all metrics
"""
lines = []
lines.append("=" * 50)
lines.append("AUTOMATED METRICS (System-Generated)")
lines.append("=" * 50)
lines.append("")
# Processing Metrics
lines.append("PROCESSING METRICS")
lines.append("-" * 30)
lines.append(f"processing_time_seconds: {result.processing_time:.2f}")
lines.append(f"frames_analyzed: {len(result.visual_features)}")
lines.append(f"scenes_detected: {len(result.scenes)}")
lines.append(f"audio_segments_analyzed: {len(result.audio_features)}")
lines.append(f"domain: {domain}")
lines.append(f"custom_prompt: {custom_prompt if custom_prompt else 'none'}")
# Count hooks from scores (estimate based on high-scoring segments)
hooks_detected = sum(1 for s in result.scores if s.combined_score > 0.7) if result.scores else 0
lines.append(f"hooks_detected: {hooks_detected}")
if result.metadata:
lines.append(f"video_duration_seconds: {result.metadata.duration:.2f}")
lines.append(f"video_resolution: {result.metadata.resolution}")
lines.append(f"video_fps: {result.metadata.fps:.2f}")
lines.append("")
# Per Clip Metrics
lines.append("PER CLIP METRICS")
lines.append("-" * 30)
for i, clip in enumerate(result.clips):
lines.append("")
lines.append(f"[Clip {i + 1}]")
lines.append(f" clip_id: {i + 1}")
lines.append(f" start_time: {clip.start_time:.2f}")
lines.append(f" end_time: {clip.end_time:.2f}")
lines.append(f" duration: {clip.duration:.2f}")
lines.append(f" hype_score: {clip.hype_score:.4f}")
lines.append(f" visual_score: {clip.visual_score:.4f}")
lines.append(f" audio_score: {clip.audio_score:.4f}")
lines.append(f" motion_score: {clip.motion_score:.4f}")
# Hook info - derive from segment scores if available
hook_type = "none"
hook_confidence = 0.0
# Find matching segment score for this clip
for score in result.scores:
if abs(score.start_time - clip.start_time) < 1.0:
if score.combined_score > 0.7:
hook_confidence = score.combined_score
# Infer hook type based on dominant score
if score.audio_score > score.visual_score and score.audio_score > score.motion_score:
hook_type = "audio_peak"
elif score.motion_score > score.visual_score:
hook_type = "motion_spike"
else:
hook_type = "visual_highlight"
break
lines.append(f" hook_type: {hook_type}")
lines.append(f" hook_confidence: {hook_confidence:.4f}")
if clip.person_detected:
lines.append(f" person_detected: True")
lines.append(f" person_screen_time: {clip.person_screen_time:.4f}")
lines.append("")
lines.append("=" * 50)
lines.append("END METRICS")
lines.append("=" * 50)
return "\n".join(lines)
# =============================================================================
# Single Video Processing
# =============================================================================
def process_video(
video_file,
domain,
num_clips,
clip_duration,
reference_image,
custom_prompt,
progress=gr.Progress()
):
"""
Main video processing function for single video mode.
Args:
video_file: Uploaded video file path
domain: Content domain for scoring weights
num_clips: Number of clips to extract
clip_duration: Duration of each clip in seconds
reference_image: Optional reference image for person filtering
custom_prompt: Optional custom instructions
progress: Gradio progress tracker
Returns:
Tuple of (status_message, clip1, clip2, clip3, log_text, metrics_text)
"""
if video_file is None:
return "Please upload a video first.", None, None, None, "", ""
log_messages = []
def log(msg):
log_messages.append(f"[{time.strftime('%H:%M:%S')}] {msg}")
logger.info(msg)
try:
video_path = Path(video_file)
log(f"Processing video: {video_path.name}")
progress(0.05, desc="Validating video...")
# Import pipeline components
from utils.helpers import validate_video_file, validate_image_file, format_duration
from pipeline.orchestrator import PipelineOrchestrator
# Validate video
validation = validate_video_file(video_file)
if not validation.is_valid:
return f"Error: {validation.error_message}", None, None, None, "\n".join(log_messages), ""
log(f"Video size: {validation.file_size / (1024*1024):.1f} MB")
# Validate reference image if provided
ref_path = None
if reference_image is not None:
ref_validation = validate_image_file(reference_image)
if ref_validation.is_valid:
ref_path = reference_image
log(f"Reference image: {Path(reference_image).name}")
else:
log(f"Warning: Invalid reference image - {ref_validation.error_message}")
# Map domain string to internal value
domain_map = {
"Sports": "sports",
"Vlogs": "vlogs",
"Music Videos": "music",
"Podcasts": "podcasts",
"Gaming": "gaming",
"General": "general",
}
domain_value = domain_map.get(domain, "general")
log(f"Domain: {domain_value}")
# Create output directory
output_dir = Path(tempfile.mkdtemp(prefix="shortsmith_output_"))
log(f"Output directory: {output_dir}")
# Progress callback to update UI during processing
def on_progress(pipeline_progress):
stage = pipeline_progress.stage.value
pct = pipeline_progress.progress
msg = pipeline_progress.message
log(f"[{stage}] {msg}")
# Map pipeline progress (0-1) to our range (0.1-0.9)
mapped_progress = 0.1 + (pct * 0.8)
progress(mapped_progress, desc=f"{stage}: {msg}")
# Initialize pipeline
progress(0.1, desc="Initializing AI models...")
log("Initializing pipeline...")
pipeline = PipelineOrchestrator(progress_callback=on_progress)
# Process video
progress(0.15, desc="Starting analysis...")
log(f"Processing: {int(num_clips)} clips @ {int(clip_duration)}s each")
result = pipeline.process(
video_path=video_path,
num_clips=int(num_clips),
clip_duration=float(clip_duration),
domain=domain_value,
reference_image=ref_path,
custom_prompt=custom_prompt.strip() if custom_prompt else None,
)
progress(0.9, desc="Extracting clips...")
# Handle result
if result.success:
log(f"Processing complete in {result.processing_time:.1f}s")
clip_paths = []
for i, clip in enumerate(result.clips):
if clip.clip_path.exists():
output_path = output_dir / f"highlight_{i+1}.mp4"
shutil.copy2(clip.clip_path, output_path)
clip_paths.append(str(output_path))
log(f"Clip {i+1}: {format_duration(clip.start_time)} - {format_duration(clip.end_time)} (score: {clip.hype_score:.2f})")
status = f"Successfully extracted {len(clip_paths)} highlight clips!\nProcessing time: {result.processing_time:.1f}s"
# Build metrics output
metrics_output = build_metrics_output(result, domain_value, custom_prompt.strip() if custom_prompt else None)
pipeline.cleanup()
progress(1.0, desc="Done!")
# Return up to 3 clips
clip1 = clip_paths[0] if len(clip_paths) > 0 else None
clip2 = clip_paths[1] if len(clip_paths) > 1 else None
clip3 = clip_paths[2] if len(clip_paths) > 2 else None
return status, clip1, clip2, clip3, "\n".join(log_messages), metrics_output
else:
log(f"Processing failed: {result.error_message}")
pipeline.cleanup()
return f"Error: {result.error_message}", None, None, None, "\n".join(log_messages), ""
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
log(error_msg)
log(traceback.format_exc())
logger.exception("Pipeline error")
return error_msg, None, None, None, "\n".join(log_messages), ""
# =============================================================================
# Batch Testing Functions
# =============================================================================
def generate_test_queue(
videos: List[str],
domains: List[str],
durations: List[int],
num_clips: int,
ref_image: Optional[str],
prompts: List[str],
include_no_prompt: bool
) -> List[Dict[str, Any]]:
"""Generate all parameter combinations to test (cartesian product)."""
# Build prompt list
prompt_list = []
if include_no_prompt:
prompt_list.append(None) # No prompt baseline
prompt_list.extend([p.strip() for p in prompts if p and p.strip()])
# If no prompts at all, use just None
if not prompt_list:
prompt_list = [None]
# Map domain display names to internal values
domain_map = {
"Sports": "sports",
"Vlogs": "vlogs",
"Music Videos": "music",
"Podcasts": "podcasts",
"Gaming": "gaming",
"General": "general",
}
queue = []
test_id = 1
for video in videos:
video_name = Path(video).name if video else "unknown"
for domain in domains:
domain_value = domain_map.get(domain, "general")
for duration in durations:
for prompt in prompt_list:
queue.append({
"test_id": test_id,
"video_path": video,
"video_name": video_name,
"domain": domain,
"domain_value": domain_value,
"clip_duration": duration,
"num_clips": num_clips,
"reference_image": ref_image,
"custom_prompt": prompt,
})
test_id += 1
return queue
def run_single_batch_test(config: Dict[str, Any], output_base_dir: Path) -> Dict[str, Any]:
"""Run a single test from the batch queue."""
from utils.helpers import validate_video_file
from pipeline.orchestrator import PipelineOrchestrator
test_id = config["test_id"]
video_path = config["video_path"]
video_name = config["video_name"]
domain_value = config["domain_value"]
duration = config["clip_duration"]
num_clips = config["num_clips"]
ref_image = config["reference_image"]
custom_prompt = config["custom_prompt"]
# Create unique output folder for this test
prompt_suffix = "no_prompt" if not custom_prompt else f"prompt_{hash(custom_prompt) % 1000}"
test_folder = f"{Path(video_name).stem}_{domain_value}_{duration}s_{prompt_suffix}"
output_dir = output_base_dir / test_folder
output_dir.mkdir(parents=True, exist_ok=True)
result_data = {
"test_id": test_id,
"video_name": video_name,
"domain": domain_value,
"clip_duration": duration,
"custom_prompt": custom_prompt if custom_prompt else "none",
"num_clips": num_clips,
"status": "failed",
"error": None,
"processing_time": 0,
"frames_analyzed": 0,
"scenes_detected": 0,
"hooks_detected": 0,
"clips": [],
"clip_paths": [],
}
try:
# Validate video
validation = validate_video_file(video_path)
if not validation.is_valid:
result_data["error"] = validation.error_message
return result_data
# Initialize and run pipeline
pipeline = PipelineOrchestrator()
result = pipeline.process(
video_path=video_path,
num_clips=num_clips,
clip_duration=float(duration),
domain=domain_value,
reference_image=ref_image,
custom_prompt=custom_prompt,
)
if result.success:
result_data["status"] = "success"
result_data["processing_time"] = round(result.processing_time, 2)
result_data["frames_analyzed"] = len(result.visual_features)
result_data["scenes_detected"] = len(result.scenes)
result_data["hooks_detected"] = sum(1 for s in result.scores if s.combined_score > 0.7) if result.scores else 0
# Copy clips and collect data
for i, clip in enumerate(result.clips):
if clip.clip_path.exists():
clip_output = output_dir / f"clip_{i+1}.mp4"
shutil.copy2(clip.clip_path, clip_output)
result_data["clip_paths"].append(str(clip_output))
# Find hook type for this clip
hook_type = "none"
hook_confidence = 0.0
for score in result.scores:
if abs(score.start_time - clip.start_time) < 1.0:
if score.combined_score > 0.7:
hook_confidence = score.combined_score
if score.audio_score > score.visual_score and score.audio_score > score.motion_score:
hook_type = "audio_peak"
elif score.motion_score > score.visual_score:
hook_type = "motion_spike"
else:
hook_type = "visual_highlight"
break
result_data["clips"].append({
"clip_id": i + 1,
"start_time": round(clip.start_time, 2),
"end_time": round(clip.end_time, 2),
"duration": round(clip.duration, 2),
"hype_score": round(clip.hype_score, 4),
"visual_score": round(clip.visual_score, 4),
"audio_score": round(clip.audio_score, 4),
"motion_score": round(clip.motion_score, 4),
"hook_type": hook_type,
"hook_confidence": round(hook_confidence, 4),
})
else:
result_data["error"] = result.error_message
pipeline.cleanup()
except Exception as e:
result_data["error"] = str(e)
logger.exception(f"Batch test {test_id} failed")
return result_data
def results_to_dataframe(results: List[Dict[str, Any]]) -> pd.DataFrame:
"""Convert batch results to a pandas DataFrame for display."""
rows = []
for r in results:
row = {
"Test ID": r["test_id"],
"Video": r["video_name"],
"Domain": r["domain"],
"Duration": f"{r['clip_duration']}s",
"Prompt": r["custom_prompt"][:20] + "..." if len(r["custom_prompt"]) > 20 else r["custom_prompt"],
"Status": r["status"],
"Time (s)": r["processing_time"],
"Frames": r["frames_analyzed"],
"Hooks": r["hooks_detected"],
}
# Add clip scores
for i, clip in enumerate(r.get("clips", [])[:3]):
row[f"Clip {i+1} Hype"] = clip.get("hype_score", 0)
rows.append(row)
return pd.DataFrame(rows)
def results_to_csv(results: List[Dict[str, Any]]) -> str:
"""Convert results to CSV format."""
rows = []
for r in results:
row = {
"test_id": r["test_id"],
"video_name": r["video_name"],
"domain": r["domain"],
"clip_duration": r["clip_duration"],
"custom_prompt": r["custom_prompt"],
"num_clips": r["num_clips"],
"status": r["status"],
"error": r.get("error", ""),
"processing_time": r["processing_time"],
"frames_analyzed": r["frames_analyzed"],
"scenes_detected": r["scenes_detected"],
"hooks_detected": r["hooks_detected"],
}
# Add per-clip data
for i in range(3):
if i < len(r.get("clips", [])):
clip = r["clips"][i]
row[f"clip_{i+1}_start"] = clip["start_time"]
row[f"clip_{i+1}_end"] = clip["end_time"]
row[f"clip_{i+1}_hype"] = clip["hype_score"]
row[f"clip_{i+1}_visual"] = clip["visual_score"]
row[f"clip_{i+1}_audio"] = clip["audio_score"]
row[f"clip_{i+1}_motion"] = clip["motion_score"]
row[f"clip_{i+1}_hook_type"] = clip["hook_type"]
else:
row[f"clip_{i+1}_start"] = ""
row[f"clip_{i+1}_end"] = ""
row[f"clip_{i+1}_hype"] = ""
row[f"clip_{i+1}_visual"] = ""
row[f"clip_{i+1}_audio"] = ""
row[f"clip_{i+1}_motion"] = ""
row[f"clip_{i+1}_hook_type"] = ""
rows.append(row)
df = pd.DataFrame(rows)
return df.to_csv(index=False)
def results_to_json(results: List[Dict[str, Any]]) -> str:
"""Convert results to JSON format."""
# Remove clip_paths from export (they're temp files)
export_results = []
for r in results:
r_copy = r.copy()
r_copy.pop("clip_paths", None)
export_results.append(r_copy)
return json.dumps(export_results, indent=2)
def create_clips_zip(results: List[Dict[str, Any]]) -> Optional[str]:
"""Create a ZIP file of all extracted clips."""
zip_path = Path(tempfile.mkdtemp()) / "batch_clips.zip"
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for r in results:
if r["status"] == "success":
folder_name = f"{Path(r['video_name']).stem}_{r['domain']}_{r['clip_duration']}s"
if r["custom_prompt"] != "none":
folder_name += f"_prompt"
for clip_path in r.get("clip_paths", []):
if Path(clip_path).exists():
arcname = f"{folder_name}/{Path(clip_path).name}"
zf.write(clip_path, arcname)
return str(zip_path) if zip_path.exists() else None
# Batch state (module level for simplicity)
batch_state = {
"is_running": False,
"should_cancel": False,
"results": [],
"output_dir": None,
}
def run_batch_tests(
videos,
domains,
durations,
num_clips,
reference_image,
include_no_prompt,
prompt1,
prompt2,
prompt3,
progress=gr.Progress()
):
"""Main batch testing function."""
global batch_state
# Validate inputs
if not videos:
return "Please upload at least one video.", None, "", "", None, None, None
if not domains:
return "Please select at least one domain.", None, "", "", None, None, None
if not durations:
return "Please select at least one duration.", None, "", "", None, None, None
# Collect prompts
prompts = [p for p in [prompt1, prompt2, prompt3] if p and p.strip()]
# Generate test queue
queue = generate_test_queue(
videos=videos,
domains=domains,
durations=durations,
num_clips=int(num_clips),
ref_image=reference_image,
prompts=prompts,
include_no_prompt=include_no_prompt,
)
if not queue:
return "No tests to run. Please check your configuration.", None, "", "", None, None, None
# Initialize batch state
batch_state["is_running"] = True
batch_state["should_cancel"] = False
batch_state["results"] = []
batch_state["output_dir"] = Path(tempfile.mkdtemp(prefix="shortsmith_batch_"))
total_tests = len(queue)
log_messages = []
def log(msg):
log_messages.append(f"[{time.strftime('%H:%M:%S')}] {msg}")
logger.info(msg)
log(f"Starting batch testing: {total_tests} tests")
log(f"Videos: {len(videos)}, Domains: {len(domains)}, Durations: {len(durations)}, Prompts: {len(prompts) + (1 if include_no_prompt else 0)}")
# Run tests sequentially
for i, test_config in enumerate(queue):
if batch_state["should_cancel"]:
log("Batch cancelled by user")
break
test_id = test_config["test_id"]
video_name = test_config["video_name"]
domain = test_config["domain_value"]
duration = test_config["clip_duration"]
prompt = test_config["custom_prompt"] or "no-prompt"
log(f"[{i+1}/{total_tests}] Testing: {video_name} | {domain} | {duration}s | {prompt[:30]}...")
progress((i + 1) / total_tests, desc=f"Test {i+1}/{total_tests}: {video_name}")
# Run the test
result = run_single_batch_test(test_config, batch_state["output_dir"])
batch_state["results"].append(result)
if result["status"] == "success":
log(f" ✓ Completed in {result['processing_time']}s")
else:
log(f" ✗ Failed: {result.get('error', 'Unknown error')}")
# Finalize
batch_state["is_running"] = False
completed = len([r for r in batch_state["results"] if r["status"] == "success"])
failed = len([r for r in batch_state["results"] if r["status"] == "failed"])
log(f"Batch complete: {completed} succeeded, {failed} failed")
# Generate outputs
results_df = results_to_dataframe(batch_state["results"])
csv_content = results_to_csv(batch_state["results"])
json_content = results_to_json(batch_state["results"])
# Save CSV and JSON to files for download
csv_path = batch_state["output_dir"] / "results.csv"
json_path = batch_state["output_dir"] / "results.json"
csv_path.write_text(csv_content)
json_path.write_text(json_content)
# Create ZIP of clips
zip_path = create_clips_zip(batch_state["results"])
status = f"Batch complete: {completed}/{total_tests} tests succeeded"
return (
status,
results_df,
"\n".join(log_messages),
json_content,
str(csv_path),
str(json_path),
zip_path,
)
def cancel_batch():
"""Cancel the running batch."""
global batch_state
batch_state["should_cancel"] = True
return "Cancelling batch... (will stop after current test completes)"
def calculate_queue_size(videos, domains, durations, include_no_prompt, prompt1, prompt2, prompt3):
"""Calculate and display the queue size."""
num_videos = len(videos) if videos else 0
num_domains = len(domains) if domains else 0
num_durations = len(durations) if durations else 0
prompts = [p for p in [prompt1, prompt2, prompt3] if p and p.strip()]
num_prompts = len(prompts) + (1 if include_no_prompt else 0)
if num_prompts == 0:
num_prompts = 1 # Default to no-prompt if nothing selected
total = num_videos * num_domains * num_durations * num_prompts
return f"Queue: {num_videos} video(s) × {num_domains} domain(s) × {num_durations} duration(s) × {num_prompts} prompt(s) = **{total} tests**"
# =============================================================================
# Build Gradio Interface
# =============================================================================
with gr.Blocks(
title="ShortSmith v2",
theme=gr.themes.Soft(),
css="""
.container { max-width: 1200px; margin: auto; }
.output-video { min-height: 200px; }
"""
) as demo:
gr.Markdown("""
# ShortSmith v2
### AI-Powered Video Highlight Extractor
Upload a video and automatically extract the most engaging highlight clips using AI analysis.
""")
with gr.Tabs():
# =================================================================
# Tab 1: Single Video
# =================================================================
with gr.TabItem("Single Video"):
with gr.Row():
# Left column - Inputs
with gr.Column(scale=1):
gr.Markdown("### Input")
video_input = gr.Video(
label="Upload Video",
sources=["upload"],
)
with gr.Accordion("Settings", open=True):
domain_dropdown = gr.Dropdown(
choices=["Sports", "Vlogs", "Music Videos", "Podcasts", "Gaming", "General"],
value="General",
label="Content Domain",
info="Select the type of content for optimized scoring"
)
with gr.Row():
num_clips_slider = gr.Slider(
minimum=1,
maximum=3,
value=3,
step=1,
label="Number of Clips",
info="How many highlight clips to extract"
)
duration_slider = gr.Slider(
minimum=5,
maximum=30,
value=15,
step=1,
label="Clip Duration (seconds)",
info="Target duration for each clip"
)
with gr.Accordion("Person Filtering (Optional)", open=False):
reference_image = gr.Image(
label="Reference Image",
type="filepath",
sources=["upload"],
)
gr.Markdown("*Upload a photo of a person to prioritize clips featuring them.*")
with gr.Accordion("Custom Instructions (Optional)", open=False):
custom_prompt = gr.Textbox(
label="Additional Instructions",
placeholder="E.g., 'Focus on crowd reactions' or 'Prioritize action scenes'",
lines=2,
)
process_btn = gr.Button(
"Extract Highlights",
variant="primary",
size="lg"
)
# Right column - Outputs
with gr.Column(scale=1):
gr.Markdown("### Output")
status_output = gr.Textbox(
label="Status",
lines=2,
interactive=False
)
gr.Markdown("#### Extracted Clips")
clip1_output = gr.Video(label="Clip 1", elem_classes=["output-video"])
clip2_output = gr.Video(label="Clip 2", elem_classes=["output-video"])
clip3_output = gr.Video(label="Clip 3", elem_classes=["output-video"])
with gr.Accordion("Processing Log", open=True):
log_output = gr.Textbox(
label="Log",
lines=10,
interactive=False,
show_copy_button=True
)
with gr.Accordion("Automated Metrics (System-Generated)", open=True):
metrics_output = gr.Textbox(
label="Metrics for Testing",
lines=20,
interactive=False,
show_copy_button=True,
info="Copy these metrics for evaluation spreadsheets"
)
# Connect single video processing
process_btn.click(
fn=process_video,
inputs=[
video_input,
domain_dropdown,
num_clips_slider,
duration_slider,
reference_image,
custom_prompt
],
outputs=[
status_output,
clip1_output,
clip2_output,
clip3_output,
log_output,
metrics_output
],
show_progress="full"
)
# =================================================================
# Tab 2: Batch Testing
# =================================================================
with gr.TabItem("Batch Testing"):
with gr.Row():
# Left column - Configuration
with gr.Column(scale=1):
gr.Markdown("### Batch Configuration")
batch_videos = gr.File(
label="Upload Video(s)",
file_count="multiple",
file_types=["video"],
)
gr.Markdown("#### Domains to Test")
batch_domains = gr.CheckboxGroup(
choices=["Sports", "Vlogs", "Music Videos", "Podcasts", "Gaming", "General"],
value=["General"],
label="Select domains",
)
gr.Markdown("#### Clip Durations to Test")
batch_durations = gr.CheckboxGroup(
choices=[10, 15, 20, 30],
value=[15],
label="Select durations (seconds)",
)
batch_num_clips = gr.Slider(
minimum=1,
maximum=3,
value=3,
step=1,
label="Number of Clips per Test",
)
with gr.Accordion("Custom Prompts", open=True):
batch_no_prompt = gr.Checkbox(
label="Include no-prompt baseline",
value=True,
info="Test without any custom prompt for comparison"
)
batch_prompt1 = gr.Textbox(
label="Prompt 1",
placeholder="E.g., 'Focus on action moments'",
lines=1,
)
batch_prompt2 = gr.Textbox(
label="Prompt 2",
placeholder="E.g., 'Find crowd reactions'",
lines=1,
)
batch_prompt3 = gr.Textbox(
label="Prompt 3",
placeholder="E.g., 'Prioritize emotional moments'",
lines=1,
)
with gr.Accordion("Reference Image (Optional)", open=False):
batch_ref_image = gr.Image(
label="Reference Image (applies to all tests)",
type="filepath",
sources=["upload"],
)
# Queue size indicator
queue_info = gr.Markdown("Queue: 0 tests")
with gr.Row():
batch_start_btn = gr.Button(
"Start Batch",
variant="primary",
size="lg"
)
batch_cancel_btn = gr.Button(
"Cancel",
variant="secondary",
size="lg"
)
# Right column - Results
with gr.Column(scale=1):
gr.Markdown("### Results")
batch_status = gr.Textbox(
label="Status",
lines=2,
interactive=False
)
batch_results_table = gr.Dataframe(
label="Test Results",
headers=["Test ID", "Video", "Domain", "Duration", "Prompt", "Status", "Time (s)", "Frames", "Hooks"],
interactive=False,
)
with gr.Accordion("Processing Log", open=True):
batch_log = gr.Textbox(
label="Log",
lines=15,
interactive=False,
show_copy_button=True
)
with gr.Accordion("Full Results (JSON)", open=False):
batch_json = gr.Textbox(
label="JSON Output",
lines=10,
interactive=False,
show_copy_button=True
)
gr.Markdown("#### Download Results")
with gr.Row():
csv_download = gr.File(label="CSV Results")
json_download = gr.File(label="JSON Results")
zip_download = gr.File(label="All Clips (ZIP)")
# Update queue size when inputs change
queue_inputs = [batch_videos, batch_domains, batch_durations, batch_no_prompt, batch_prompt1, batch_prompt2, batch_prompt3]
for inp in queue_inputs:
inp.change(
fn=calculate_queue_size,
inputs=queue_inputs,
outputs=queue_info
)
# Connect batch processing
batch_start_btn.click(
fn=run_batch_tests,
inputs=[
batch_videos,
batch_domains,
batch_durations,
batch_num_clips,
batch_ref_image,
batch_no_prompt,
batch_prompt1,
batch_prompt2,
batch_prompt3,
],
outputs=[
batch_status,
batch_results_table,
batch_log,
batch_json,
csv_download,
json_download,
zip_download,
],
show_progress="full"
)
batch_cancel_btn.click(
fn=cancel_batch,
inputs=[],
outputs=[batch_status]
)
gr.Markdown("""
---
**ShortSmith v2** | Powered by Qwen2-VL, InsightFace, and Librosa |
[GitHub](https://github.com) | Built with Gradio
""")
# Launch the app
if __name__ == "__main__":
demo.queue()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
else:
# For HuggingFace Spaces
demo.queue()
demo.launch()