Spaces:
Paused
Paused
| """ | |
| ShortSmith v2 - Gradio Application | |
| Hugging Face Space interface for video highlight extraction. | |
| Features: | |
| - Multi-modal analysis (visual + audio + motion) | |
| - Domain-optimized presets | |
| - Person-specific filtering (optional) | |
| - Scene-aware clip cutting | |
| - Batch testing with parameter variations | |
| """ | |
| import os | |
| import sys | |
| import tempfile | |
| import shutil | |
| import json | |
| import zipfile | |
| from pathlib import Path | |
| import time | |
| import traceback | |
| from typing import List, Dict, Any, Optional | |
| import gradio as gr | |
| import pandas as pd | |
| # Add project root to path | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| # Initialize logging | |
| try: | |
| from utils.logger import setup_logging, get_logger | |
| setup_logging(log_level="INFO", log_to_console=True) | |
| logger = get_logger("app") | |
| except Exception: | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("app") | |
| # ============================================================================= | |
| # Shared Utilities | |
| # ============================================================================= | |
| def build_metrics_output(result, domain: str, custom_prompt: Optional[str] = None) -> str: | |
| """ | |
| Build formatted metrics output for testing and evaluation. | |
| Args: | |
| result: PipelineResult object | |
| domain: Content domain used for processing | |
| custom_prompt: Custom prompt used (if any) | |
| Returns: | |
| Formatted string with all metrics | |
| """ | |
| lines = [] | |
| lines.append("=" * 50) | |
| lines.append("AUTOMATED METRICS (System-Generated)") | |
| lines.append("=" * 50) | |
| lines.append("") | |
| # Processing Metrics | |
| lines.append("PROCESSING METRICS") | |
| lines.append("-" * 30) | |
| lines.append(f"processing_time_seconds: {result.processing_time:.2f}") | |
| lines.append(f"frames_analyzed: {len(result.visual_features)}") | |
| lines.append(f"scenes_detected: {len(result.scenes)}") | |
| lines.append(f"audio_segments_analyzed: {len(result.audio_features)}") | |
| lines.append(f"domain: {domain}") | |
| lines.append(f"custom_prompt: {custom_prompt if custom_prompt else 'none'}") | |
| # Count hooks from scores (estimate based on high-scoring segments) | |
| hooks_detected = sum(1 for s in result.scores if s.combined_score > 0.7) if result.scores else 0 | |
| lines.append(f"hooks_detected: {hooks_detected}") | |
| if result.metadata: | |
| lines.append(f"video_duration_seconds: {result.metadata.duration:.2f}") | |
| lines.append(f"video_resolution: {result.metadata.resolution}") | |
| lines.append(f"video_fps: {result.metadata.fps:.2f}") | |
| lines.append("") | |
| # Per Clip Metrics | |
| lines.append("PER CLIP METRICS") | |
| lines.append("-" * 30) | |
| for i, clip in enumerate(result.clips): | |
| lines.append("") | |
| lines.append(f"[Clip {i + 1}]") | |
| lines.append(f" clip_id: {i + 1}") | |
| lines.append(f" start_time: {clip.start_time:.2f}") | |
| lines.append(f" end_time: {clip.end_time:.2f}") | |
| lines.append(f" duration: {clip.duration:.2f}") | |
| lines.append(f" hype_score: {clip.hype_score:.4f}") | |
| lines.append(f" visual_score: {clip.visual_score:.4f}") | |
| lines.append(f" audio_score: {clip.audio_score:.4f}") | |
| lines.append(f" motion_score: {clip.motion_score:.4f}") | |
| # Hook info - derive from segment scores if available | |
| hook_type = "none" | |
| hook_confidence = 0.0 | |
| # Find matching segment score for this clip | |
| for score in result.scores: | |
| if abs(score.start_time - clip.start_time) < 1.0: | |
| if score.combined_score > 0.7: | |
| hook_confidence = score.combined_score | |
| # Infer hook type based on dominant score | |
| if score.audio_score > score.visual_score and score.audio_score > score.motion_score: | |
| hook_type = "audio_peak" | |
| elif score.motion_score > score.visual_score: | |
| hook_type = "motion_spike" | |
| else: | |
| hook_type = "visual_highlight" | |
| break | |
| lines.append(f" hook_type: {hook_type}") | |
| lines.append(f" hook_confidence: {hook_confidence:.4f}") | |
| if clip.person_detected: | |
| lines.append(f" person_detected: True") | |
| lines.append(f" person_screen_time: {clip.person_screen_time:.4f}") | |
| lines.append("") | |
| lines.append("=" * 50) | |
| lines.append("END METRICS") | |
| lines.append("=" * 50) | |
| return "\n".join(lines) | |
| # ============================================================================= | |
| # Single Video Processing | |
| # ============================================================================= | |
| def process_video( | |
| video_file, | |
| domain, | |
| num_clips, | |
| clip_duration, | |
| reference_image, | |
| custom_prompt, | |
| progress=gr.Progress() | |
| ): | |
| """ | |
| Main video processing function for single video mode. | |
| Args: | |
| video_file: Uploaded video file path | |
| domain: Content domain for scoring weights | |
| num_clips: Number of clips to extract | |
| clip_duration: Duration of each clip in seconds | |
| reference_image: Optional reference image for person filtering | |
| custom_prompt: Optional custom instructions | |
| progress: Gradio progress tracker | |
| Returns: | |
| Tuple of (status_message, clip1, clip2, clip3, log_text, metrics_text) | |
| """ | |
| if video_file is None: | |
| return "Please upload a video first.", None, None, None, "", "" | |
| log_messages = [] | |
| def log(msg): | |
| log_messages.append(f"[{time.strftime('%H:%M:%S')}] {msg}") | |
| logger.info(msg) | |
| try: | |
| video_path = Path(video_file) | |
| log(f"Processing video: {video_path.name}") | |
| progress(0.05, desc="Validating video...") | |
| # Import pipeline components | |
| from utils.helpers import validate_video_file, validate_image_file, format_duration | |
| from pipeline.orchestrator import PipelineOrchestrator | |
| # Validate video | |
| validation = validate_video_file(video_file) | |
| if not validation.is_valid: | |
| return f"Error: {validation.error_message}", None, None, None, "\n".join(log_messages), "" | |
| log(f"Video size: {validation.file_size / (1024*1024):.1f} MB") | |
| # Validate reference image if provided | |
| ref_path = None | |
| if reference_image is not None: | |
| ref_validation = validate_image_file(reference_image) | |
| if ref_validation.is_valid: | |
| ref_path = reference_image | |
| log(f"Reference image: {Path(reference_image).name}") | |
| else: | |
| log(f"Warning: Invalid reference image - {ref_validation.error_message}") | |
| # Map domain string to internal value | |
| domain_map = { | |
| "Sports": "sports", | |
| "Vlogs": "vlogs", | |
| "Music Videos": "music", | |
| "Podcasts": "podcasts", | |
| "Gaming": "gaming", | |
| "General": "general", | |
| } | |
| domain_value = domain_map.get(domain, "general") | |
| log(f"Domain: {domain_value}") | |
| # Create output directory | |
| output_dir = Path(tempfile.mkdtemp(prefix="shortsmith_output_")) | |
| log(f"Output directory: {output_dir}") | |
| # Progress callback to update UI during processing | |
| def on_progress(pipeline_progress): | |
| stage = pipeline_progress.stage.value | |
| pct = pipeline_progress.progress | |
| msg = pipeline_progress.message | |
| log(f"[{stage}] {msg}") | |
| # Map pipeline progress (0-1) to our range (0.1-0.9) | |
| mapped_progress = 0.1 + (pct * 0.8) | |
| progress(mapped_progress, desc=f"{stage}: {msg}") | |
| # Initialize pipeline | |
| progress(0.1, desc="Initializing AI models...") | |
| log("Initializing pipeline...") | |
| pipeline = PipelineOrchestrator(progress_callback=on_progress) | |
| # Process video | |
| progress(0.15, desc="Starting analysis...") | |
| log(f"Processing: {int(num_clips)} clips @ {int(clip_duration)}s each") | |
| result = pipeline.process( | |
| video_path=video_path, | |
| num_clips=int(num_clips), | |
| clip_duration=float(clip_duration), | |
| domain=domain_value, | |
| reference_image=ref_path, | |
| custom_prompt=custom_prompt.strip() if custom_prompt else None, | |
| ) | |
| progress(0.9, desc="Extracting clips...") | |
| # Handle result | |
| if result.success: | |
| log(f"Processing complete in {result.processing_time:.1f}s") | |
| clip_paths = [] | |
| for i, clip in enumerate(result.clips): | |
| if clip.clip_path.exists(): | |
| output_path = output_dir / f"highlight_{i+1}.mp4" | |
| shutil.copy2(clip.clip_path, output_path) | |
| clip_paths.append(str(output_path)) | |
| log(f"Clip {i+1}: {format_duration(clip.start_time)} - {format_duration(clip.end_time)} (score: {clip.hype_score:.2f})") | |
| status = f"Successfully extracted {len(clip_paths)} highlight clips!\nProcessing time: {result.processing_time:.1f}s" | |
| # Build metrics output | |
| metrics_output = build_metrics_output(result, domain_value, custom_prompt.strip() if custom_prompt else None) | |
| pipeline.cleanup() | |
| progress(1.0, desc="Done!") | |
| # Return up to 3 clips | |
| clip1 = clip_paths[0] if len(clip_paths) > 0 else None | |
| clip2 = clip_paths[1] if len(clip_paths) > 1 else None | |
| clip3 = clip_paths[2] if len(clip_paths) > 2 else None | |
| return status, clip1, clip2, clip3, "\n".join(log_messages), metrics_output | |
| else: | |
| log(f"Processing failed: {result.error_message}") | |
| pipeline.cleanup() | |
| return f"Error: {result.error_message}", None, None, None, "\n".join(log_messages), "" | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| log(error_msg) | |
| log(traceback.format_exc()) | |
| logger.exception("Pipeline error") | |
| return error_msg, None, None, None, "\n".join(log_messages), "" | |
| # ============================================================================= | |
| # Batch Testing Functions | |
| # ============================================================================= | |
| def generate_test_queue( | |
| videos: List[str], | |
| domains: List[str], | |
| durations: List[int], | |
| num_clips: int, | |
| ref_image: Optional[str], | |
| prompts: List[str], | |
| include_no_prompt: bool | |
| ) -> List[Dict[str, Any]]: | |
| """Generate all parameter combinations to test (cartesian product).""" | |
| # Build prompt list | |
| prompt_list = [] | |
| if include_no_prompt: | |
| prompt_list.append(None) # No prompt baseline | |
| prompt_list.extend([p.strip() for p in prompts if p and p.strip()]) | |
| # If no prompts at all, use just None | |
| if not prompt_list: | |
| prompt_list = [None] | |
| # Map domain display names to internal values | |
| domain_map = { | |
| "Sports": "sports", | |
| "Vlogs": "vlogs", | |
| "Music Videos": "music", | |
| "Podcasts": "podcasts", | |
| "Gaming": "gaming", | |
| "General": "general", | |
| } | |
| queue = [] | |
| test_id = 1 | |
| for video in videos: | |
| video_name = Path(video).name if video else "unknown" | |
| for domain in domains: | |
| domain_value = domain_map.get(domain, "general") | |
| for duration in durations: | |
| for prompt in prompt_list: | |
| queue.append({ | |
| "test_id": test_id, | |
| "video_path": video, | |
| "video_name": video_name, | |
| "domain": domain, | |
| "domain_value": domain_value, | |
| "clip_duration": duration, | |
| "num_clips": num_clips, | |
| "reference_image": ref_image, | |
| "custom_prompt": prompt, | |
| }) | |
| test_id += 1 | |
| return queue | |
| def run_single_batch_test(config: Dict[str, Any], output_base_dir: Path) -> Dict[str, Any]: | |
| """Run a single test from the batch queue.""" | |
| from utils.helpers import validate_video_file | |
| from pipeline.orchestrator import PipelineOrchestrator | |
| test_id = config["test_id"] | |
| video_path = config["video_path"] | |
| video_name = config["video_name"] | |
| domain_value = config["domain_value"] | |
| duration = config["clip_duration"] | |
| num_clips = config["num_clips"] | |
| ref_image = config["reference_image"] | |
| custom_prompt = config["custom_prompt"] | |
| # Create unique output folder for this test | |
| prompt_suffix = "no_prompt" if not custom_prompt else f"prompt_{hash(custom_prompt) % 1000}" | |
| test_folder = f"{Path(video_name).stem}_{domain_value}_{duration}s_{prompt_suffix}" | |
| output_dir = output_base_dir / test_folder | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| result_data = { | |
| "test_id": test_id, | |
| "video_name": video_name, | |
| "domain": domain_value, | |
| "clip_duration": duration, | |
| "custom_prompt": custom_prompt if custom_prompt else "none", | |
| "num_clips": num_clips, | |
| "status": "failed", | |
| "error": None, | |
| "processing_time": 0, | |
| "frames_analyzed": 0, | |
| "scenes_detected": 0, | |
| "hooks_detected": 0, | |
| "clips": [], | |
| "clip_paths": [], | |
| } | |
| try: | |
| # Validate video | |
| validation = validate_video_file(video_path) | |
| if not validation.is_valid: | |
| result_data["error"] = validation.error_message | |
| return result_data | |
| # Initialize and run pipeline | |
| pipeline = PipelineOrchestrator() | |
| result = pipeline.process( | |
| video_path=video_path, | |
| num_clips=num_clips, | |
| clip_duration=float(duration), | |
| domain=domain_value, | |
| reference_image=ref_image, | |
| custom_prompt=custom_prompt, | |
| ) | |
| if result.success: | |
| result_data["status"] = "success" | |
| result_data["processing_time"] = round(result.processing_time, 2) | |
| result_data["frames_analyzed"] = len(result.visual_features) | |
| result_data["scenes_detected"] = len(result.scenes) | |
| result_data["hooks_detected"] = sum(1 for s in result.scores if s.combined_score > 0.7) if result.scores else 0 | |
| # Copy clips and collect data | |
| for i, clip in enumerate(result.clips): | |
| if clip.clip_path.exists(): | |
| clip_output = output_dir / f"clip_{i+1}.mp4" | |
| shutil.copy2(clip.clip_path, clip_output) | |
| result_data["clip_paths"].append(str(clip_output)) | |
| # Find hook type for this clip | |
| hook_type = "none" | |
| hook_confidence = 0.0 | |
| for score in result.scores: | |
| if abs(score.start_time - clip.start_time) < 1.0: | |
| if score.combined_score > 0.7: | |
| hook_confidence = score.combined_score | |
| if score.audio_score > score.visual_score and score.audio_score > score.motion_score: | |
| hook_type = "audio_peak" | |
| elif score.motion_score > score.visual_score: | |
| hook_type = "motion_spike" | |
| else: | |
| hook_type = "visual_highlight" | |
| break | |
| result_data["clips"].append({ | |
| "clip_id": i + 1, | |
| "start_time": round(clip.start_time, 2), | |
| "end_time": round(clip.end_time, 2), | |
| "duration": round(clip.duration, 2), | |
| "hype_score": round(clip.hype_score, 4), | |
| "visual_score": round(clip.visual_score, 4), | |
| "audio_score": round(clip.audio_score, 4), | |
| "motion_score": round(clip.motion_score, 4), | |
| "hook_type": hook_type, | |
| "hook_confidence": round(hook_confidence, 4), | |
| }) | |
| else: | |
| result_data["error"] = result.error_message | |
| pipeline.cleanup() | |
| except Exception as e: | |
| result_data["error"] = str(e) | |
| logger.exception(f"Batch test {test_id} failed") | |
| return result_data | |
| def results_to_dataframe(results: List[Dict[str, Any]]) -> pd.DataFrame: | |
| """Convert batch results to a pandas DataFrame for display.""" | |
| rows = [] | |
| for r in results: | |
| row = { | |
| "Test ID": r["test_id"], | |
| "Video": r["video_name"], | |
| "Domain": r["domain"], | |
| "Duration": f"{r['clip_duration']}s", | |
| "Prompt": r["custom_prompt"][:20] + "..." if len(r["custom_prompt"]) > 20 else r["custom_prompt"], | |
| "Status": r["status"], | |
| "Time (s)": r["processing_time"], | |
| "Frames": r["frames_analyzed"], | |
| "Hooks": r["hooks_detected"], | |
| } | |
| # Add clip scores | |
| for i, clip in enumerate(r.get("clips", [])[:3]): | |
| row[f"Clip {i+1} Hype"] = clip.get("hype_score", 0) | |
| rows.append(row) | |
| return pd.DataFrame(rows) | |
| def results_to_csv(results: List[Dict[str, Any]]) -> str: | |
| """Convert results to CSV format.""" | |
| rows = [] | |
| for r in results: | |
| row = { | |
| "test_id": r["test_id"], | |
| "video_name": r["video_name"], | |
| "domain": r["domain"], | |
| "clip_duration": r["clip_duration"], | |
| "custom_prompt": r["custom_prompt"], | |
| "num_clips": r["num_clips"], | |
| "status": r["status"], | |
| "error": r.get("error", ""), | |
| "processing_time": r["processing_time"], | |
| "frames_analyzed": r["frames_analyzed"], | |
| "scenes_detected": r["scenes_detected"], | |
| "hooks_detected": r["hooks_detected"], | |
| } | |
| # Add per-clip data | |
| for i in range(3): | |
| if i < len(r.get("clips", [])): | |
| clip = r["clips"][i] | |
| row[f"clip_{i+1}_start"] = clip["start_time"] | |
| row[f"clip_{i+1}_end"] = clip["end_time"] | |
| row[f"clip_{i+1}_hype"] = clip["hype_score"] | |
| row[f"clip_{i+1}_visual"] = clip["visual_score"] | |
| row[f"clip_{i+1}_audio"] = clip["audio_score"] | |
| row[f"clip_{i+1}_motion"] = clip["motion_score"] | |
| row[f"clip_{i+1}_hook_type"] = clip["hook_type"] | |
| else: | |
| row[f"clip_{i+1}_start"] = "" | |
| row[f"clip_{i+1}_end"] = "" | |
| row[f"clip_{i+1}_hype"] = "" | |
| row[f"clip_{i+1}_visual"] = "" | |
| row[f"clip_{i+1}_audio"] = "" | |
| row[f"clip_{i+1}_motion"] = "" | |
| row[f"clip_{i+1}_hook_type"] = "" | |
| rows.append(row) | |
| df = pd.DataFrame(rows) | |
| return df.to_csv(index=False) | |
| def results_to_json(results: List[Dict[str, Any]]) -> str: | |
| """Convert results to JSON format.""" | |
| # Remove clip_paths from export (they're temp files) | |
| export_results = [] | |
| for r in results: | |
| r_copy = r.copy() | |
| r_copy.pop("clip_paths", None) | |
| export_results.append(r_copy) | |
| return json.dumps(export_results, indent=2) | |
| def create_clips_zip(results: List[Dict[str, Any]]) -> Optional[str]: | |
| """Create a ZIP file of all extracted clips.""" | |
| zip_path = Path(tempfile.mkdtemp()) / "batch_clips.zip" | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: | |
| for r in results: | |
| if r["status"] == "success": | |
| folder_name = f"{Path(r['video_name']).stem}_{r['domain']}_{r['clip_duration']}s" | |
| if r["custom_prompt"] != "none": | |
| folder_name += f"_prompt" | |
| for clip_path in r.get("clip_paths", []): | |
| if Path(clip_path).exists(): | |
| arcname = f"{folder_name}/{Path(clip_path).name}" | |
| zf.write(clip_path, arcname) | |
| return str(zip_path) if zip_path.exists() else None | |
| # Batch state (module level for simplicity) | |
| batch_state = { | |
| "is_running": False, | |
| "should_cancel": False, | |
| "results": [], | |
| "output_dir": None, | |
| } | |
| def run_batch_tests( | |
| videos, | |
| domains, | |
| durations, | |
| num_clips, | |
| reference_image, | |
| include_no_prompt, | |
| prompt1, | |
| prompt2, | |
| prompt3, | |
| progress=gr.Progress() | |
| ): | |
| """Main batch testing function.""" | |
| global batch_state | |
| # Validate inputs | |
| if not videos: | |
| return "Please upload at least one video.", None, "", "", None, None, None | |
| if not domains: | |
| return "Please select at least one domain.", None, "", "", None, None, None | |
| if not durations: | |
| return "Please select at least one duration.", None, "", "", None, None, None | |
| # Collect prompts | |
| prompts = [p for p in [prompt1, prompt2, prompt3] if p and p.strip()] | |
| # Generate test queue | |
| queue = generate_test_queue( | |
| videos=videos, | |
| domains=domains, | |
| durations=durations, | |
| num_clips=int(num_clips), | |
| ref_image=reference_image, | |
| prompts=prompts, | |
| include_no_prompt=include_no_prompt, | |
| ) | |
| if not queue: | |
| return "No tests to run. Please check your configuration.", None, "", "", None, None, None | |
| # Initialize batch state | |
| batch_state["is_running"] = True | |
| batch_state["should_cancel"] = False | |
| batch_state["results"] = [] | |
| batch_state["output_dir"] = Path(tempfile.mkdtemp(prefix="shortsmith_batch_")) | |
| total_tests = len(queue) | |
| log_messages = [] | |
| def log(msg): | |
| log_messages.append(f"[{time.strftime('%H:%M:%S')}] {msg}") | |
| logger.info(msg) | |
| log(f"Starting batch testing: {total_tests} tests") | |
| log(f"Videos: {len(videos)}, Domains: {len(domains)}, Durations: {len(durations)}, Prompts: {len(prompts) + (1 if include_no_prompt else 0)}") | |
| # Run tests sequentially | |
| for i, test_config in enumerate(queue): | |
| if batch_state["should_cancel"]: | |
| log("Batch cancelled by user") | |
| break | |
| test_id = test_config["test_id"] | |
| video_name = test_config["video_name"] | |
| domain = test_config["domain_value"] | |
| duration = test_config["clip_duration"] | |
| prompt = test_config["custom_prompt"] or "no-prompt" | |
| log(f"[{i+1}/{total_tests}] Testing: {video_name} | {domain} | {duration}s | {prompt[:30]}...") | |
| progress((i + 1) / total_tests, desc=f"Test {i+1}/{total_tests}: {video_name}") | |
| # Run the test | |
| result = run_single_batch_test(test_config, batch_state["output_dir"]) | |
| batch_state["results"].append(result) | |
| if result["status"] == "success": | |
| log(f" ✓ Completed in {result['processing_time']}s") | |
| else: | |
| log(f" ✗ Failed: {result.get('error', 'Unknown error')}") | |
| # Finalize | |
| batch_state["is_running"] = False | |
| completed = len([r for r in batch_state["results"] if r["status"] == "success"]) | |
| failed = len([r for r in batch_state["results"] if r["status"] == "failed"]) | |
| log(f"Batch complete: {completed} succeeded, {failed} failed") | |
| # Generate outputs | |
| results_df = results_to_dataframe(batch_state["results"]) | |
| csv_content = results_to_csv(batch_state["results"]) | |
| json_content = results_to_json(batch_state["results"]) | |
| # Save CSV and JSON to files for download | |
| csv_path = batch_state["output_dir"] / "results.csv" | |
| json_path = batch_state["output_dir"] / "results.json" | |
| csv_path.write_text(csv_content) | |
| json_path.write_text(json_content) | |
| # Create ZIP of clips | |
| zip_path = create_clips_zip(batch_state["results"]) | |
| status = f"Batch complete: {completed}/{total_tests} tests succeeded" | |
| return ( | |
| status, | |
| results_df, | |
| "\n".join(log_messages), | |
| json_content, | |
| str(csv_path), | |
| str(json_path), | |
| zip_path, | |
| ) | |
| def cancel_batch(): | |
| """Cancel the running batch.""" | |
| global batch_state | |
| batch_state["should_cancel"] = True | |
| return "Cancelling batch... (will stop after current test completes)" | |
| def calculate_queue_size(videos, domains, durations, include_no_prompt, prompt1, prompt2, prompt3): | |
| """Calculate and display the queue size.""" | |
| num_videos = len(videos) if videos else 0 | |
| num_domains = len(domains) if domains else 0 | |
| num_durations = len(durations) if durations else 0 | |
| prompts = [p for p in [prompt1, prompt2, prompt3] if p and p.strip()] | |
| num_prompts = len(prompts) + (1 if include_no_prompt else 0) | |
| if num_prompts == 0: | |
| num_prompts = 1 # Default to no-prompt if nothing selected | |
| total = num_videos * num_domains * num_durations * num_prompts | |
| return f"Queue: {num_videos} video(s) × {num_domains} domain(s) × {num_durations} duration(s) × {num_prompts} prompt(s) = **{total} tests**" | |
| # ============================================================================= | |
| # Build Gradio Interface | |
| # ============================================================================= | |
| with gr.Blocks( | |
| title="ShortSmith v2", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .container { max-width: 1200px; margin: auto; } | |
| .output-video { min-height: 200px; } | |
| """ | |
| ) as demo: | |
| gr.Markdown(""" | |
| # ShortSmith v2 | |
| ### AI-Powered Video Highlight Extractor | |
| Upload a video and automatically extract the most engaging highlight clips using AI analysis. | |
| """) | |
| with gr.Tabs(): | |
| # ================================================================= | |
| # Tab 1: Single Video | |
| # ================================================================= | |
| with gr.TabItem("Single Video"): | |
| with gr.Row(): | |
| # Left column - Inputs | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Input") | |
| video_input = gr.Video( | |
| label="Upload Video", | |
| sources=["upload"], | |
| ) | |
| with gr.Accordion("Settings", open=True): | |
| domain_dropdown = gr.Dropdown( | |
| choices=["Sports", "Vlogs", "Music Videos", "Podcasts", "Gaming", "General"], | |
| value="General", | |
| label="Content Domain", | |
| info="Select the type of content for optimized scoring" | |
| ) | |
| with gr.Row(): | |
| num_clips_slider = gr.Slider( | |
| minimum=1, | |
| maximum=3, | |
| value=3, | |
| step=1, | |
| label="Number of Clips", | |
| info="How many highlight clips to extract" | |
| ) | |
| duration_slider = gr.Slider( | |
| minimum=5, | |
| maximum=30, | |
| value=15, | |
| step=1, | |
| label="Clip Duration (seconds)", | |
| info="Target duration for each clip" | |
| ) | |
| with gr.Accordion("Person Filtering (Optional)", open=False): | |
| reference_image = gr.Image( | |
| label="Reference Image", | |
| type="filepath", | |
| sources=["upload"], | |
| ) | |
| gr.Markdown("*Upload a photo of a person to prioritize clips featuring them.*") | |
| with gr.Accordion("Custom Instructions (Optional)", open=False): | |
| custom_prompt = gr.Textbox( | |
| label="Additional Instructions", | |
| placeholder="E.g., 'Focus on crowd reactions' or 'Prioritize action scenes'", | |
| lines=2, | |
| ) | |
| process_btn = gr.Button( | |
| "Extract Highlights", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| # Right column - Outputs | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Output") | |
| status_output = gr.Textbox( | |
| label="Status", | |
| lines=2, | |
| interactive=False | |
| ) | |
| gr.Markdown("#### Extracted Clips") | |
| clip1_output = gr.Video(label="Clip 1", elem_classes=["output-video"]) | |
| clip2_output = gr.Video(label="Clip 2", elem_classes=["output-video"]) | |
| clip3_output = gr.Video(label="Clip 3", elem_classes=["output-video"]) | |
| with gr.Accordion("Processing Log", open=True): | |
| log_output = gr.Textbox( | |
| label="Log", | |
| lines=10, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| with gr.Accordion("Automated Metrics (System-Generated)", open=True): | |
| metrics_output = gr.Textbox( | |
| label="Metrics for Testing", | |
| lines=20, | |
| interactive=False, | |
| show_copy_button=True, | |
| info="Copy these metrics for evaluation spreadsheets" | |
| ) | |
| # Connect single video processing | |
| process_btn.click( | |
| fn=process_video, | |
| inputs=[ | |
| video_input, | |
| domain_dropdown, | |
| num_clips_slider, | |
| duration_slider, | |
| reference_image, | |
| custom_prompt | |
| ], | |
| outputs=[ | |
| status_output, | |
| clip1_output, | |
| clip2_output, | |
| clip3_output, | |
| log_output, | |
| metrics_output | |
| ], | |
| show_progress="full" | |
| ) | |
| # ================================================================= | |
| # Tab 2: Batch Testing | |
| # ================================================================= | |
| with gr.TabItem("Batch Testing"): | |
| with gr.Row(): | |
| # Left column - Configuration | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Batch Configuration") | |
| batch_videos = gr.File( | |
| label="Upload Video(s)", | |
| file_count="multiple", | |
| file_types=["video"], | |
| ) | |
| gr.Markdown("#### Domains to Test") | |
| batch_domains = gr.CheckboxGroup( | |
| choices=["Sports", "Vlogs", "Music Videos", "Podcasts", "Gaming", "General"], | |
| value=["General"], | |
| label="Select domains", | |
| ) | |
| gr.Markdown("#### Clip Durations to Test") | |
| batch_durations = gr.CheckboxGroup( | |
| choices=[10, 15, 20, 30], | |
| value=[15], | |
| label="Select durations (seconds)", | |
| ) | |
| batch_num_clips = gr.Slider( | |
| minimum=1, | |
| maximum=3, | |
| value=3, | |
| step=1, | |
| label="Number of Clips per Test", | |
| ) | |
| with gr.Accordion("Custom Prompts", open=True): | |
| batch_no_prompt = gr.Checkbox( | |
| label="Include no-prompt baseline", | |
| value=True, | |
| info="Test without any custom prompt for comparison" | |
| ) | |
| batch_prompt1 = gr.Textbox( | |
| label="Prompt 1", | |
| placeholder="E.g., 'Focus on action moments'", | |
| lines=1, | |
| ) | |
| batch_prompt2 = gr.Textbox( | |
| label="Prompt 2", | |
| placeholder="E.g., 'Find crowd reactions'", | |
| lines=1, | |
| ) | |
| batch_prompt3 = gr.Textbox( | |
| label="Prompt 3", | |
| placeholder="E.g., 'Prioritize emotional moments'", | |
| lines=1, | |
| ) | |
| with gr.Accordion("Reference Image (Optional)", open=False): | |
| batch_ref_image = gr.Image( | |
| label="Reference Image (applies to all tests)", | |
| type="filepath", | |
| sources=["upload"], | |
| ) | |
| # Queue size indicator | |
| queue_info = gr.Markdown("Queue: 0 tests") | |
| with gr.Row(): | |
| batch_start_btn = gr.Button( | |
| "Start Batch", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| batch_cancel_btn = gr.Button( | |
| "Cancel", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| # Right column - Results | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Results") | |
| batch_status = gr.Textbox( | |
| label="Status", | |
| lines=2, | |
| interactive=False | |
| ) | |
| batch_results_table = gr.Dataframe( | |
| label="Test Results", | |
| headers=["Test ID", "Video", "Domain", "Duration", "Prompt", "Status", "Time (s)", "Frames", "Hooks"], | |
| interactive=False, | |
| ) | |
| with gr.Accordion("Processing Log", open=True): | |
| batch_log = gr.Textbox( | |
| label="Log", | |
| lines=15, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| with gr.Accordion("Full Results (JSON)", open=False): | |
| batch_json = gr.Textbox( | |
| label="JSON Output", | |
| lines=10, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| gr.Markdown("#### Download Results") | |
| with gr.Row(): | |
| csv_download = gr.File(label="CSV Results") | |
| json_download = gr.File(label="JSON Results") | |
| zip_download = gr.File(label="All Clips (ZIP)") | |
| # Update queue size when inputs change | |
| queue_inputs = [batch_videos, batch_domains, batch_durations, batch_no_prompt, batch_prompt1, batch_prompt2, batch_prompt3] | |
| for inp in queue_inputs: | |
| inp.change( | |
| fn=calculate_queue_size, | |
| inputs=queue_inputs, | |
| outputs=queue_info | |
| ) | |
| # Connect batch processing | |
| batch_start_btn.click( | |
| fn=run_batch_tests, | |
| inputs=[ | |
| batch_videos, | |
| batch_domains, | |
| batch_durations, | |
| batch_num_clips, | |
| batch_ref_image, | |
| batch_no_prompt, | |
| batch_prompt1, | |
| batch_prompt2, | |
| batch_prompt3, | |
| ], | |
| outputs=[ | |
| batch_status, | |
| batch_results_table, | |
| batch_log, | |
| batch_json, | |
| csv_download, | |
| json_download, | |
| zip_download, | |
| ], | |
| show_progress="full" | |
| ) | |
| batch_cancel_btn.click( | |
| fn=cancel_batch, | |
| inputs=[], | |
| outputs=[batch_status] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| **ShortSmith v2** | Powered by Qwen2-VL, InsightFace, and Librosa | | |
| [GitHub](https://github.com) | Built with Gradio | |
| """) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.queue() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True | |
| ) | |
| else: | |
| # For HuggingFace Spaces | |
| demo.queue() | |
| demo.launch() | |