""" ShortSmith v2 - Gradio Application Hugging Face Space interface for video highlight extraction. Features: - Multi-modal analysis (visual + audio + motion) - Domain-optimized presets - Person-specific filtering (optional) - Scene-aware clip cutting - Batch testing with parameter variations """ import os import sys import tempfile import shutil import json import zipfile from pathlib import Path import time import traceback from typing import List, Dict, Any, Optional import gradio as gr import pandas as pd # Add project root to path sys.path.insert(0, str(Path(__file__).parent)) # Initialize logging try: from utils.logger import setup_logging, get_logger setup_logging(log_level="INFO", log_to_console=True) logger = get_logger("app") except Exception: import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("app") # ============================================================================= # Shared Utilities # ============================================================================= def build_metrics_output(result, domain: str, custom_prompt: Optional[str] = None) -> str: """ Build formatted metrics output for testing and evaluation. Args: result: PipelineResult object domain: Content domain used for processing custom_prompt: Custom prompt used (if any) Returns: Formatted string with all metrics """ lines = [] lines.append("=" * 50) lines.append("AUTOMATED METRICS (System-Generated)") lines.append("=" * 50) lines.append("") # Processing Metrics lines.append("PROCESSING METRICS") lines.append("-" * 30) lines.append(f"processing_time_seconds: {result.processing_time:.2f}") lines.append(f"frames_analyzed: {len(result.visual_features)}") lines.append(f"scenes_detected: {len(result.scenes)}") lines.append(f"audio_segments_analyzed: {len(result.audio_features)}") lines.append(f"domain: {domain}") lines.append(f"custom_prompt: {custom_prompt if custom_prompt else 'none'}") # Count hooks from scores (estimate based on high-scoring segments) hooks_detected = sum(1 for s in result.scores if s.combined_score > 0.7) if result.scores else 0 lines.append(f"hooks_detected: {hooks_detected}") if result.metadata: lines.append(f"video_duration_seconds: {result.metadata.duration:.2f}") lines.append(f"video_resolution: {result.metadata.resolution}") lines.append(f"video_fps: {result.metadata.fps:.2f}") lines.append("") # Per Clip Metrics lines.append("PER CLIP METRICS") lines.append("-" * 30) for i, clip in enumerate(result.clips): lines.append("") lines.append(f"[Clip {i + 1}]") lines.append(f" clip_id: {i + 1}") lines.append(f" start_time: {clip.start_time:.2f}") lines.append(f" end_time: {clip.end_time:.2f}") lines.append(f" duration: {clip.duration:.2f}") lines.append(f" hype_score: {clip.hype_score:.4f}") lines.append(f" visual_score: {clip.visual_score:.4f}") lines.append(f" audio_score: {clip.audio_score:.4f}") lines.append(f" motion_score: {clip.motion_score:.4f}") # Hook info - derive from segment scores if available hook_type = "none" hook_confidence = 0.0 # Find matching segment score for this clip for score in result.scores: if abs(score.start_time - clip.start_time) < 1.0: if score.combined_score > 0.7: hook_confidence = score.combined_score # Infer hook type based on dominant score if score.audio_score > score.visual_score and score.audio_score > score.motion_score: hook_type = "audio_peak" elif score.motion_score > score.visual_score: hook_type = "motion_spike" else: hook_type = "visual_highlight" break lines.append(f" hook_type: {hook_type}") lines.append(f" hook_confidence: {hook_confidence:.4f}") if clip.person_detected: lines.append(f" person_detected: True") lines.append(f" person_screen_time: {clip.person_screen_time:.4f}") lines.append("") lines.append("=" * 50) lines.append("END METRICS") lines.append("=" * 50) return "\n".join(lines) # ============================================================================= # Single Video Processing # ============================================================================= def process_video( video_file, domain, num_clips, clip_duration, reference_image, custom_prompt, progress=gr.Progress() ): """ Main video processing function for single video mode. Args: video_file: Uploaded video file path domain: Content domain for scoring weights num_clips: Number of clips to extract clip_duration: Duration of each clip in seconds reference_image: Optional reference image for person filtering custom_prompt: Optional custom instructions progress: Gradio progress tracker Returns: Tuple of (status_message, clip1, clip2, clip3, log_text, metrics_text) """ if video_file is None: return "Please upload a video first.", None, None, None, "", "" log_messages = [] def log(msg): log_messages.append(f"[{time.strftime('%H:%M:%S')}] {msg}") logger.info(msg) try: video_path = Path(video_file) log(f"Processing video: {video_path.name}") progress(0.05, desc="Validating video...") # Import pipeline components from utils.helpers import validate_video_file, validate_image_file, format_duration from pipeline.orchestrator import PipelineOrchestrator # Validate video validation = validate_video_file(video_file) if not validation.is_valid: return f"Error: {validation.error_message}", None, None, None, "\n".join(log_messages), "" log(f"Video size: {validation.file_size / (1024*1024):.1f} MB") # Validate reference image if provided ref_path = None if reference_image is not None: ref_validation = validate_image_file(reference_image) if ref_validation.is_valid: ref_path = reference_image log(f"Reference image: {Path(reference_image).name}") else: log(f"Warning: Invalid reference image - {ref_validation.error_message}") # Map domain string to internal value domain_map = { "Sports": "sports", "Vlogs": "vlogs", "Music Videos": "music", "Podcasts": "podcasts", "Gaming": "gaming", "General": "general", } domain_value = domain_map.get(domain, "general") log(f"Domain: {domain_value}") # Create output directory output_dir = Path(tempfile.mkdtemp(prefix="shortsmith_output_")) log(f"Output directory: {output_dir}") # Progress callback to update UI during processing def on_progress(pipeline_progress): stage = pipeline_progress.stage.value pct = pipeline_progress.progress msg = pipeline_progress.message log(f"[{stage}] {msg}") # Map pipeline progress (0-1) to our range (0.1-0.9) mapped_progress = 0.1 + (pct * 0.8) progress(mapped_progress, desc=f"{stage}: {msg}") # Initialize pipeline progress(0.1, desc="Initializing AI models...") log("Initializing pipeline...") pipeline = PipelineOrchestrator(progress_callback=on_progress) # Process video progress(0.15, desc="Starting analysis...") log(f"Processing: {int(num_clips)} clips @ {int(clip_duration)}s each") result = pipeline.process( video_path=video_path, num_clips=int(num_clips), clip_duration=float(clip_duration), domain=domain_value, reference_image=ref_path, custom_prompt=custom_prompt.strip() if custom_prompt else None, ) progress(0.9, desc="Extracting clips...") # Handle result if result.success: log(f"Processing complete in {result.processing_time:.1f}s") clip_paths = [] for i, clip in enumerate(result.clips): if clip.clip_path.exists(): output_path = output_dir / f"highlight_{i+1}.mp4" shutil.copy2(clip.clip_path, output_path) clip_paths.append(str(output_path)) log(f"Clip {i+1}: {format_duration(clip.start_time)} - {format_duration(clip.end_time)} (score: {clip.hype_score:.2f})") status = f"Successfully extracted {len(clip_paths)} highlight clips!\nProcessing time: {result.processing_time:.1f}s" # Build metrics output metrics_output = build_metrics_output(result, domain_value, custom_prompt.strip() if custom_prompt else None) pipeline.cleanup() progress(1.0, desc="Done!") # Return up to 3 clips clip1 = clip_paths[0] if len(clip_paths) > 0 else None clip2 = clip_paths[1] if len(clip_paths) > 1 else None clip3 = clip_paths[2] if len(clip_paths) > 2 else None return status, clip1, clip2, clip3, "\n".join(log_messages), metrics_output else: log(f"Processing failed: {result.error_message}") pipeline.cleanup() return f"Error: {result.error_message}", None, None, None, "\n".join(log_messages), "" except Exception as e: error_msg = f"Unexpected error: {str(e)}" log(error_msg) log(traceback.format_exc()) logger.exception("Pipeline error") return error_msg, None, None, None, "\n".join(log_messages), "" # ============================================================================= # Batch Testing Functions # ============================================================================= def generate_test_queue( videos: List[str], domains: List[str], durations: List[int], num_clips: int, ref_image: Optional[str], prompts: List[str], include_no_prompt: bool ) -> List[Dict[str, Any]]: """Generate all parameter combinations to test (cartesian product).""" # Build prompt list prompt_list = [] if include_no_prompt: prompt_list.append(None) # No prompt baseline prompt_list.extend([p.strip() for p in prompts if p and p.strip()]) # If no prompts at all, use just None if not prompt_list: prompt_list = [None] # Map domain display names to internal values domain_map = { "Sports": "sports", "Vlogs": "vlogs", "Music Videos": "music", "Podcasts": "podcasts", "Gaming": "gaming", "General": "general", } queue = [] test_id = 1 for video in videos: video_name = Path(video).name if video else "unknown" for domain in domains: domain_value = domain_map.get(domain, "general") for duration in durations: for prompt in prompt_list: queue.append({ "test_id": test_id, "video_path": video, "video_name": video_name, "domain": domain, "domain_value": domain_value, "clip_duration": duration, "num_clips": num_clips, "reference_image": ref_image, "custom_prompt": prompt, }) test_id += 1 return queue def run_single_batch_test(config: Dict[str, Any], output_base_dir: Path) -> Dict[str, Any]: """Run a single test from the batch queue.""" from utils.helpers import validate_video_file from pipeline.orchestrator import PipelineOrchestrator test_id = config["test_id"] video_path = config["video_path"] video_name = config["video_name"] domain_value = config["domain_value"] duration = config["clip_duration"] num_clips = config["num_clips"] ref_image = config["reference_image"] custom_prompt = config["custom_prompt"] # Create unique output folder for this test prompt_suffix = "no_prompt" if not custom_prompt else f"prompt_{hash(custom_prompt) % 1000}" test_folder = f"{Path(video_name).stem}_{domain_value}_{duration}s_{prompt_suffix}" output_dir = output_base_dir / test_folder output_dir.mkdir(parents=True, exist_ok=True) result_data = { "test_id": test_id, "video_name": video_name, "domain": domain_value, "clip_duration": duration, "custom_prompt": custom_prompt if custom_prompt else "none", "num_clips": num_clips, "status": "failed", "error": None, "processing_time": 0, "frames_analyzed": 0, "scenes_detected": 0, "hooks_detected": 0, "clips": [], "clip_paths": [], } try: # Validate video validation = validate_video_file(video_path) if not validation.is_valid: result_data["error"] = validation.error_message return result_data # Initialize and run pipeline pipeline = PipelineOrchestrator() result = pipeline.process( video_path=video_path, num_clips=num_clips, clip_duration=float(duration), domain=domain_value, reference_image=ref_image, custom_prompt=custom_prompt, ) if result.success: result_data["status"] = "success" result_data["processing_time"] = round(result.processing_time, 2) result_data["frames_analyzed"] = len(result.visual_features) result_data["scenes_detected"] = len(result.scenes) result_data["hooks_detected"] = sum(1 for s in result.scores if s.combined_score > 0.7) if result.scores else 0 # Copy clips and collect data for i, clip in enumerate(result.clips): if clip.clip_path.exists(): clip_output = output_dir / f"clip_{i+1}.mp4" shutil.copy2(clip.clip_path, clip_output) result_data["clip_paths"].append(str(clip_output)) # Find hook type for this clip hook_type = "none" hook_confidence = 0.0 for score in result.scores: if abs(score.start_time - clip.start_time) < 1.0: if score.combined_score > 0.7: hook_confidence = score.combined_score if score.audio_score > score.visual_score and score.audio_score > score.motion_score: hook_type = "audio_peak" elif score.motion_score > score.visual_score: hook_type = "motion_spike" else: hook_type = "visual_highlight" break result_data["clips"].append({ "clip_id": i + 1, "start_time": round(clip.start_time, 2), "end_time": round(clip.end_time, 2), "duration": round(clip.duration, 2), "hype_score": round(clip.hype_score, 4), "visual_score": round(clip.visual_score, 4), "audio_score": round(clip.audio_score, 4), "motion_score": round(clip.motion_score, 4), "hook_type": hook_type, "hook_confidence": round(hook_confidence, 4), }) else: result_data["error"] = result.error_message pipeline.cleanup() except Exception as e: result_data["error"] = str(e) logger.exception(f"Batch test {test_id} failed") return result_data def results_to_dataframe(results: List[Dict[str, Any]]) -> pd.DataFrame: """Convert batch results to a pandas DataFrame for display.""" rows = [] for r in results: row = { "Test ID": r["test_id"], "Video": r["video_name"], "Domain": r["domain"], "Duration": f"{r['clip_duration']}s", "Prompt": r["custom_prompt"][:20] + "..." if len(r["custom_prompt"]) > 20 else r["custom_prompt"], "Status": r["status"], "Time (s)": r["processing_time"], "Frames": r["frames_analyzed"], "Hooks": r["hooks_detected"], } # Add clip scores for i, clip in enumerate(r.get("clips", [])[:3]): row[f"Clip {i+1} Hype"] = clip.get("hype_score", 0) rows.append(row) return pd.DataFrame(rows) def results_to_csv(results: List[Dict[str, Any]]) -> str: """Convert results to CSV format.""" rows = [] for r in results: row = { "test_id": r["test_id"], "video_name": r["video_name"], "domain": r["domain"], "clip_duration": r["clip_duration"], "custom_prompt": r["custom_prompt"], "num_clips": r["num_clips"], "status": r["status"], "error": r.get("error", ""), "processing_time": r["processing_time"], "frames_analyzed": r["frames_analyzed"], "scenes_detected": r["scenes_detected"], "hooks_detected": r["hooks_detected"], } # Add per-clip data for i in range(3): if i < len(r.get("clips", [])): clip = r["clips"][i] row[f"clip_{i+1}_start"] = clip["start_time"] row[f"clip_{i+1}_end"] = clip["end_time"] row[f"clip_{i+1}_hype"] = clip["hype_score"] row[f"clip_{i+1}_visual"] = clip["visual_score"] row[f"clip_{i+1}_audio"] = clip["audio_score"] row[f"clip_{i+1}_motion"] = clip["motion_score"] row[f"clip_{i+1}_hook_type"] = clip["hook_type"] else: row[f"clip_{i+1}_start"] = "" row[f"clip_{i+1}_end"] = "" row[f"clip_{i+1}_hype"] = "" row[f"clip_{i+1}_visual"] = "" row[f"clip_{i+1}_audio"] = "" row[f"clip_{i+1}_motion"] = "" row[f"clip_{i+1}_hook_type"] = "" rows.append(row) df = pd.DataFrame(rows) return df.to_csv(index=False) def results_to_json(results: List[Dict[str, Any]]) -> str: """Convert results to JSON format.""" # Remove clip_paths from export (they're temp files) export_results = [] for r in results: r_copy = r.copy() r_copy.pop("clip_paths", None) export_results.append(r_copy) return json.dumps(export_results, indent=2) def create_clips_zip(results: List[Dict[str, Any]]) -> Optional[str]: """Create a ZIP file of all extracted clips.""" zip_path = Path(tempfile.mkdtemp()) / "batch_clips.zip" with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: for r in results: if r["status"] == "success": folder_name = f"{Path(r['video_name']).stem}_{r['domain']}_{r['clip_duration']}s" if r["custom_prompt"] != "none": folder_name += f"_prompt" for clip_path in r.get("clip_paths", []): if Path(clip_path).exists(): arcname = f"{folder_name}/{Path(clip_path).name}" zf.write(clip_path, arcname) return str(zip_path) if zip_path.exists() else None # Batch state (module level for simplicity) batch_state = { "is_running": False, "should_cancel": False, "results": [], "output_dir": None, } def run_batch_tests( videos, domains, durations, num_clips, reference_image, include_no_prompt, prompt1, prompt2, prompt3, progress=gr.Progress() ): """Main batch testing function.""" global batch_state # Validate inputs if not videos: return "Please upload at least one video.", None, "", "", None, None, None if not domains: return "Please select at least one domain.", None, "", "", None, None, None if not durations: return "Please select at least one duration.", None, "", "", None, None, None # Collect prompts prompts = [p for p in [prompt1, prompt2, prompt3] if p and p.strip()] # Generate test queue queue = generate_test_queue( videos=videos, domains=domains, durations=durations, num_clips=int(num_clips), ref_image=reference_image, prompts=prompts, include_no_prompt=include_no_prompt, ) if not queue: return "No tests to run. Please check your configuration.", None, "", "", None, None, None # Initialize batch state batch_state["is_running"] = True batch_state["should_cancel"] = False batch_state["results"] = [] batch_state["output_dir"] = Path(tempfile.mkdtemp(prefix="shortsmith_batch_")) total_tests = len(queue) log_messages = [] def log(msg): log_messages.append(f"[{time.strftime('%H:%M:%S')}] {msg}") logger.info(msg) log(f"Starting batch testing: {total_tests} tests") log(f"Videos: {len(videos)}, Domains: {len(domains)}, Durations: {len(durations)}, Prompts: {len(prompts) + (1 if include_no_prompt else 0)}") # Run tests sequentially for i, test_config in enumerate(queue): if batch_state["should_cancel"]: log("Batch cancelled by user") break test_id = test_config["test_id"] video_name = test_config["video_name"] domain = test_config["domain_value"] duration = test_config["clip_duration"] prompt = test_config["custom_prompt"] or "no-prompt" log(f"[{i+1}/{total_tests}] Testing: {video_name} | {domain} | {duration}s | {prompt[:30]}...") progress((i + 1) / total_tests, desc=f"Test {i+1}/{total_tests}: {video_name}") # Run the test result = run_single_batch_test(test_config, batch_state["output_dir"]) batch_state["results"].append(result) if result["status"] == "success": log(f" ✓ Completed in {result['processing_time']}s") else: log(f" ✗ Failed: {result.get('error', 'Unknown error')}") # Finalize batch_state["is_running"] = False completed = len([r for r in batch_state["results"] if r["status"] == "success"]) failed = len([r for r in batch_state["results"] if r["status"] == "failed"]) log(f"Batch complete: {completed} succeeded, {failed} failed") # Generate outputs results_df = results_to_dataframe(batch_state["results"]) csv_content = results_to_csv(batch_state["results"]) json_content = results_to_json(batch_state["results"]) # Save CSV and JSON to files for download csv_path = batch_state["output_dir"] / "results.csv" json_path = batch_state["output_dir"] / "results.json" csv_path.write_text(csv_content) json_path.write_text(json_content) # Create ZIP of clips zip_path = create_clips_zip(batch_state["results"]) status = f"Batch complete: {completed}/{total_tests} tests succeeded" return ( status, results_df, "\n".join(log_messages), json_content, str(csv_path), str(json_path), zip_path, ) def cancel_batch(): """Cancel the running batch.""" global batch_state batch_state["should_cancel"] = True return "Cancelling batch... (will stop after current test completes)" def calculate_queue_size(videos, domains, durations, include_no_prompt, prompt1, prompt2, prompt3): """Calculate and display the queue size.""" num_videos = len(videos) if videos else 0 num_domains = len(domains) if domains else 0 num_durations = len(durations) if durations else 0 prompts = [p for p in [prompt1, prompt2, prompt3] if p and p.strip()] num_prompts = len(prompts) + (1 if include_no_prompt else 0) if num_prompts == 0: num_prompts = 1 # Default to no-prompt if nothing selected total = num_videos * num_domains * num_durations * num_prompts return f"Queue: {num_videos} video(s) × {num_domains} domain(s) × {num_durations} duration(s) × {num_prompts} prompt(s) = **{total} tests**" # ============================================================================= # Build Gradio Interface # ============================================================================= with gr.Blocks( title="ShortSmith v2", theme=gr.themes.Soft(), css=""" .container { max-width: 1200px; margin: auto; } .output-video { min-height: 200px; } """ ) as demo: gr.Markdown(""" # ShortSmith v2 ### AI-Powered Video Highlight Extractor Upload a video and automatically extract the most engaging highlight clips using AI analysis. """) with gr.Tabs(): # ================================================================= # Tab 1: Single Video # ================================================================= with gr.TabItem("Single Video"): with gr.Row(): # Left column - Inputs with gr.Column(scale=1): gr.Markdown("### Input") video_input = gr.Video( label="Upload Video", sources=["upload"], ) with gr.Accordion("Settings", open=True): domain_dropdown = gr.Dropdown( choices=["Sports", "Vlogs", "Music Videos", "Podcasts", "Gaming", "General"], value="General", label="Content Domain", info="Select the type of content for optimized scoring" ) with gr.Row(): num_clips_slider = gr.Slider( minimum=1, maximum=3, value=3, step=1, label="Number of Clips", info="How many highlight clips to extract" ) duration_slider = gr.Slider( minimum=5, maximum=30, value=15, step=1, label="Clip Duration (seconds)", info="Target duration for each clip" ) with gr.Accordion("Person Filtering (Optional)", open=False): reference_image = gr.Image( label="Reference Image", type="filepath", sources=["upload"], ) gr.Markdown("*Upload a photo of a person to prioritize clips featuring them.*") with gr.Accordion("Custom Instructions (Optional)", open=False): custom_prompt = gr.Textbox( label="Additional Instructions", placeholder="E.g., 'Focus on crowd reactions' or 'Prioritize action scenes'", lines=2, ) process_btn = gr.Button( "Extract Highlights", variant="primary", size="lg" ) # Right column - Outputs with gr.Column(scale=1): gr.Markdown("### Output") status_output = gr.Textbox( label="Status", lines=2, interactive=False ) gr.Markdown("#### Extracted Clips") clip1_output = gr.Video(label="Clip 1", elem_classes=["output-video"]) clip2_output = gr.Video(label="Clip 2", elem_classes=["output-video"]) clip3_output = gr.Video(label="Clip 3", elem_classes=["output-video"]) with gr.Accordion("Processing Log", open=True): log_output = gr.Textbox( label="Log", lines=10, interactive=False, show_copy_button=True ) with gr.Accordion("Automated Metrics (System-Generated)", open=True): metrics_output = gr.Textbox( label="Metrics for Testing", lines=20, interactive=False, show_copy_button=True, info="Copy these metrics for evaluation spreadsheets" ) # Connect single video processing process_btn.click( fn=process_video, inputs=[ video_input, domain_dropdown, num_clips_slider, duration_slider, reference_image, custom_prompt ], outputs=[ status_output, clip1_output, clip2_output, clip3_output, log_output, metrics_output ], show_progress="full" ) # ================================================================= # Tab 2: Batch Testing # ================================================================= with gr.TabItem("Batch Testing"): with gr.Row(): # Left column - Configuration with gr.Column(scale=1): gr.Markdown("### Batch Configuration") batch_videos = gr.File( label="Upload Video(s)", file_count="multiple", file_types=["video"], ) gr.Markdown("#### Domains to Test") batch_domains = gr.CheckboxGroup( choices=["Sports", "Vlogs", "Music Videos", "Podcasts", "Gaming", "General"], value=["General"], label="Select domains", ) gr.Markdown("#### Clip Durations to Test") batch_durations = gr.CheckboxGroup( choices=[10, 15, 20, 30], value=[15], label="Select durations (seconds)", ) batch_num_clips = gr.Slider( minimum=1, maximum=3, value=3, step=1, label="Number of Clips per Test", ) with gr.Accordion("Custom Prompts", open=True): batch_no_prompt = gr.Checkbox( label="Include no-prompt baseline", value=True, info="Test without any custom prompt for comparison" ) batch_prompt1 = gr.Textbox( label="Prompt 1", placeholder="E.g., 'Focus on action moments'", lines=1, ) batch_prompt2 = gr.Textbox( label="Prompt 2", placeholder="E.g., 'Find crowd reactions'", lines=1, ) batch_prompt3 = gr.Textbox( label="Prompt 3", placeholder="E.g., 'Prioritize emotional moments'", lines=1, ) with gr.Accordion("Reference Image (Optional)", open=False): batch_ref_image = gr.Image( label="Reference Image (applies to all tests)", type="filepath", sources=["upload"], ) # Queue size indicator queue_info = gr.Markdown("Queue: 0 tests") with gr.Row(): batch_start_btn = gr.Button( "Start Batch", variant="primary", size="lg" ) batch_cancel_btn = gr.Button( "Cancel", variant="secondary", size="lg" ) # Right column - Results with gr.Column(scale=1): gr.Markdown("### Results") batch_status = gr.Textbox( label="Status", lines=2, interactive=False ) batch_results_table = gr.Dataframe( label="Test Results", headers=["Test ID", "Video", "Domain", "Duration", "Prompt", "Status", "Time (s)", "Frames", "Hooks"], interactive=False, ) with gr.Accordion("Processing Log", open=True): batch_log = gr.Textbox( label="Log", lines=15, interactive=False, show_copy_button=True ) with gr.Accordion("Full Results (JSON)", open=False): batch_json = gr.Textbox( label="JSON Output", lines=10, interactive=False, show_copy_button=True ) gr.Markdown("#### Download Results") with gr.Row(): csv_download = gr.File(label="CSV Results") json_download = gr.File(label="JSON Results") zip_download = gr.File(label="All Clips (ZIP)") # Update queue size when inputs change queue_inputs = [batch_videos, batch_domains, batch_durations, batch_no_prompt, batch_prompt1, batch_prompt2, batch_prompt3] for inp in queue_inputs: inp.change( fn=calculate_queue_size, inputs=queue_inputs, outputs=queue_info ) # Connect batch processing batch_start_btn.click( fn=run_batch_tests, inputs=[ batch_videos, batch_domains, batch_durations, batch_num_clips, batch_ref_image, batch_no_prompt, batch_prompt1, batch_prompt2, batch_prompt3, ], outputs=[ batch_status, batch_results_table, batch_log, batch_json, csv_download, json_download, zip_download, ], show_progress="full" ) batch_cancel_btn.click( fn=cancel_batch, inputs=[], outputs=[batch_status] ) gr.Markdown(""" --- **ShortSmith v2** | Powered by Qwen2-VL, InsightFace, and Librosa | [GitHub](https://github.com) | Built with Gradio """) # Launch the app if __name__ == "__main__": demo.queue() demo.launch( server_name="0.0.0.0", server_port=7860, show_error=True ) else: # For HuggingFace Spaces demo.queue() demo.launch()