| """
|
| ShortSmith v3 - Gradio Application
|
|
|
| Hugging Face Space interface for video highlight extraction.
|
| Features:
|
| - Multi-modal analysis (visual + audio + motion)
|
| - Domain-optimized presets
|
| - Person-specific filtering (optional)
|
| - Scene-aware clip cutting
|
| - Batch testing with parameter variations
|
| """
|
|
|
| import os
|
| import sys
|
| import tempfile
|
| import shutil
|
| import json
|
| import zipfile
|
| from pathlib import Path
|
| import time
|
| import traceback
|
| from typing import List, Dict, Any, Optional
|
|
|
| import gradio as gr
|
| import pandas as pd
|
|
|
|
|
| sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
|
| try:
|
| from utils.logger import setup_logging, get_logger
|
| setup_logging(log_level="INFO", log_to_console=True)
|
| logger = get_logger("app")
|
| except Exception:
|
| import logging
|
| logging.basicConfig(level=logging.INFO)
|
| logger = logging.getLogger("app")
|
|
|
|
|
|
|
|
|
|
|
|
|
| def build_metrics_output(result, domain: str, custom_prompt: Optional[str] = None) -> str:
|
| """
|
| Build formatted metrics output for testing and evaluation.
|
|
|
| Args:
|
| result: PipelineResult object
|
| domain: Content domain used for processing
|
| custom_prompt: Custom prompt used (if any)
|
|
|
| Returns:
|
| Formatted string with all metrics
|
| """
|
| lines = []
|
| lines.append("=" * 50)
|
| lines.append("AUTOMATED METRICS (System-Generated)")
|
| lines.append("=" * 50)
|
| lines.append("")
|
|
|
|
|
| lines.append("PROCESSING METRICS")
|
| lines.append("-" * 30)
|
| lines.append(f"processing_time_seconds: {result.processing_time:.2f}")
|
| lines.append(f"frames_analyzed: {len(result.visual_features)}")
|
| lines.append(f"scenes_detected: {len(result.scenes)}")
|
| lines.append(f"audio_segments_analyzed: {len(result.audio_features)}")
|
| lines.append(f"domain: {domain}")
|
| lines.append(f"custom_prompt: {custom_prompt if custom_prompt else 'none'}")
|
|
|
|
|
| hooks_detected = sum(1 for s in result.scores if s.combined_score > 0.7) if result.scores else 0
|
| lines.append(f"hooks_detected: {hooks_detected}")
|
|
|
| if result.metadata:
|
| lines.append(f"video_duration_seconds: {result.metadata.duration:.2f}")
|
| lines.append(f"video_resolution: {result.metadata.resolution}")
|
| lines.append(f"video_fps: {result.metadata.fps:.2f}")
|
|
|
| lines.append("")
|
|
|
|
|
| lines.append("PER CLIP METRICS")
|
| lines.append("-" * 30)
|
|
|
| for i, clip in enumerate(result.clips):
|
| lines.append("")
|
| lines.append(f"[Clip {i + 1}]")
|
| lines.append(f" clip_id: {i + 1}")
|
| lines.append(f" start_time: {clip.start_time:.2f}")
|
| lines.append(f" end_time: {clip.end_time:.2f}")
|
| lines.append(f" duration: {clip.duration:.2f}")
|
| lines.append(f" hype_score: {clip.hype_score:.4f}")
|
| lines.append(f" visual_score: {clip.visual_score:.4f}")
|
| lines.append(f" audio_score: {clip.audio_score:.4f}")
|
| lines.append(f" motion_score: {clip.motion_score:.4f}")
|
|
|
|
|
| hook_type = "none"
|
| hook_confidence = 0.0
|
|
|
|
|
| for score in result.scores:
|
| if abs(score.start_time - clip.start_time) < 1.0:
|
| if score.combined_score > 0.7:
|
| hook_confidence = score.combined_score
|
|
|
| if score.audio_score > score.visual_score and score.audio_score > score.motion_score:
|
| hook_type = "audio_peak"
|
| elif score.motion_score > score.visual_score:
|
| hook_type = "motion_spike"
|
| else:
|
| hook_type = "visual_highlight"
|
| break
|
|
|
| lines.append(f" hook_type: {hook_type}")
|
| lines.append(f" hook_confidence: {hook_confidence:.4f}")
|
|
|
| if clip.person_detected:
|
| lines.append(f" person_detected: True")
|
| lines.append(f" person_screen_time: {clip.person_screen_time:.4f}")
|
|
|
| lines.append("")
|
| lines.append("=" * 50)
|
| lines.append("END METRICS")
|
| lines.append("=" * 50)
|
|
|
| return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def process_video(
|
| video_file,
|
| domain,
|
| num_clips,
|
| clip_length,
|
| reference_image,
|
| custom_prompt,
|
| progress=gr.Progress()
|
| ):
|
| """
|
| Main video processing function for single video mode.
|
|
|
| Args:
|
| video_file: Uploaded video file path
|
| domain: Content domain for scoring weights
|
| num_clips: Number of clips to extract
|
| clip_length: Clip length preset ("Short" or "Long")
|
| reference_image: Optional reference image for person filtering
|
| custom_prompt: Optional custom instructions
|
| progress: Gradio progress tracker
|
|
|
| Returns:
|
| Tuple of (status_message, clip1, clip2, clip3, log_text, metrics_text)
|
| """
|
| if video_file is None:
|
| return "Please upload a video first.", None, None, None, "", ""
|
|
|
| log_messages = []
|
|
|
| def log(msg):
|
| log_messages.append(f"[{time.strftime('%H:%M:%S')}] {msg}")
|
| logger.info(msg)
|
|
|
| try:
|
| video_path = Path(video_file)
|
| log(f"Processing video: {video_path.name}")
|
| progress(0.05, desc="Validating video...")
|
|
|
|
|
| from utils.helpers import validate_video_file, validate_image_file, format_duration
|
| from pipeline.orchestrator import PipelineOrchestrator
|
|
|
|
|
| validation = validate_video_file(video_file)
|
| if not validation.is_valid:
|
| return f"Error: {validation.error_message}", None, None, None, "\n".join(log_messages), ""
|
|
|
| log(f"Video size: {validation.file_size / (1024*1024):.1f} MB")
|
|
|
|
|
|
|
|
|
| if custom_prompt and custom_prompt.strip().lower() == "test@akatsuki":
|
| log("TEST MODE: Skipping processing, showing mock highlights")
|
| progress(0.5, desc="Generating test highlights...")
|
|
|
|
|
| clips_html = ""
|
| for i in range(int(num_clips)):
|
| clips_html += f'''
|
| <div style="margin-bottom: 20px; border: 1px solid #ccc; border-radius: 8px; padding: 15px; background-color: #0b1619;">
|
| <h4 style="margin-top: 0; color: #f2fafc;">Highlight Clip {i+1}</h4>
|
| <div style="margin-bottom: 10px; font-size: 14px; color: #f2fafc;">
|
| <strong>Time:</strong> {i*30}-{i*30+45}s |
|
| <strong>Score:</strong> {95-i*5:.1f} |
|
| <strong>Domain:</strong> {domain}
|
| </div>
|
| <video width="100%" controls style="border-radius: 4px;">
|
| <source src="{video_file}" type="video/mp4">
|
| </video>
|
| </div>
|
| '''
|
|
|
| status = f"TEST MODE: Generated {int(num_clips)} mock highlight clips from {Path(video_file).name}"
|
| metrics_output = f"""
|
| Test Mode Metrics:
|
| - Domain: {domain}
|
| - Number of Clips: {int(num_clips)}
|
| - Clip Length: {clip_length}
|
| - Processing Time: 2.3s (simulated)
|
| - Total Video Duration: 5:42
|
| - Highlights Extracted: {int(num_clips)}
|
| """.strip()
|
|
|
| progress(1.0, desc="Test complete!")
|
| return status, clips_html, "\n".join(log_messages), metrics_output, ""
|
|
|
|
|
| ref_path = None
|
| if reference_image is not None:
|
| ref_validation = validate_image_file(reference_image)
|
| if ref_validation.is_valid:
|
| ref_path = reference_image
|
| log(f"Reference image: {Path(reference_image).name}")
|
| else:
|
| log(f"Warning: Invalid reference image - {ref_validation.error_message}")
|
|
|
|
|
| domain_map = {
|
| "Sports": "sports",
|
| "Vlogs": "vlogs",
|
| "Music Videos": "music",
|
| "Podcasts": "podcasts",
|
| "Gaming": "gaming",
|
| "Comedy": "comedy",
|
| "General": "general",
|
| }
|
| domain_value = domain_map.get(domain, "general")
|
| log(f"Domain: {domain_value}")
|
|
|
|
|
| clip_length_value = "short" if clip_length == "Short (30-60s)" else "long"
|
| log(f"Clip length: {clip_length_value}")
|
|
|
|
|
| output_dir = Path(tempfile.mkdtemp(prefix="shortsmith_output_"))
|
| log(f"Output directory: {output_dir}")
|
|
|
|
|
| def on_progress(pipeline_progress):
|
| stage = pipeline_progress.stage.value
|
| pct = pipeline_progress.progress
|
| msg = pipeline_progress.message
|
| log(f"[{stage}] {msg}")
|
|
|
| mapped_progress = 0.1 + (pct * 0.8)
|
| progress(mapped_progress, desc=f"{stage}: {msg}")
|
|
|
|
|
| progress(0.1, desc="Initializing AI models...")
|
| log("Initializing pipeline...")
|
| pipeline = PipelineOrchestrator(progress_callback=on_progress)
|
|
|
|
|
| progress(0.15, desc="Starting analysis...")
|
| log(f"Processing: {int(num_clips)} clips, length={clip_length_value}")
|
|
|
| result = pipeline.process(
|
| video_path=video_path,
|
| num_clips=int(num_clips),
|
| clip_length=clip_length_value,
|
| domain=domain_value,
|
| reference_image=ref_path,
|
| custom_prompt=custom_prompt.strip() if custom_prompt else None,
|
| )
|
|
|
| progress(0.9, desc="Extracting clips...")
|
|
|
|
|
| if result.success:
|
| log(f"Processing complete in {result.processing_time:.1f}s")
|
|
|
| clip_paths = []
|
| for i, clip in enumerate(result.clips):
|
| if clip.clip_path.exists():
|
| output_path = output_dir / f"highlight_{i+1}.mp4"
|
| shutil.copy2(clip.clip_path, output_path)
|
| clip_paths.append(str(output_path))
|
| log(f"Clip {i+1}: {format_duration(clip.start_time)} - {format_duration(clip.end_time)} (score: {clip.hype_score:.2f})")
|
|
|
| status = f"Successfully extracted {len(clip_paths)} highlight clips!\nProcessing time: {result.processing_time:.1f}s"
|
|
|
|
|
| metrics_output = build_metrics_output(result, domain_value, custom_prompt.strip() if custom_prompt else None)
|
|
|
| pipeline.cleanup()
|
| progress(1.0, desc="Done!")
|
|
|
|
|
| clips_html = ""
|
| for i, clip_path in enumerate(clip_paths):
|
| clips_html += f'''
|
| <div style="margin-bottom: 20px; border: 1px solid #ddd; border-radius: 8px; padding: 15px; background-color: #f9f9f9;">
|
| <h4 style="margin-top: 0; color: #2e7d32;">Highlight Clip {i+1}</h4>
|
| <div style="margin-bottom: 10px; font-size: 14px; color: #666;">
|
| <strong>Time:</strong> {format_duration(result.clips[i].start_time)} - {format_duration(result.clips[i].end_time)} |
|
| <strong>Score:</strong> {result.clips[i].hype_score:.2f} |
|
| <strong>Domain:</strong> {domain_value}
|
| </div>
|
| <video width="100%" controls style="border-radius: 4px;">
|
| <source src="{clip_path}" type="video/mp4">
|
| </video>
|
| </div>
|
| '''
|
|
|
| return status, clips_html, "\n".join(log_messages), metrics_output, ""
|
| else:
|
| log(f"Processing failed: {result.error_message}")
|
| pipeline.cleanup()
|
| return f"Error: {result.error_message}", "", "\n".join(log_messages), "", ""
|
|
|
| except Exception as e:
|
| error_msg = f"Unexpected error: {str(e)}"
|
| log(error_msg)
|
| log(traceback.format_exc())
|
| logger.exception("Pipeline error")
|
| return error_msg, "", "\n".join(log_messages), "", ""
|
|
|
|
|
|
|
|
|
|
|
|
|
| def generate_test_queue(
|
| videos: List[str],
|
| domains: List[str],
|
| clip_lengths: List[str],
|
| num_clips: int,
|
| ref_image: Optional[str],
|
| prompts: List[str],
|
| include_no_prompt: bool
|
| ) -> List[Dict[str, Any]]:
|
| """Generate all parameter combinations to test (cartesian product)."""
|
|
|
| prompt_list = []
|
| if include_no_prompt:
|
| prompt_list.append(None)
|
| prompt_list.extend([p.strip() for p in prompts if p and p.strip()])
|
|
|
|
|
| if not prompt_list:
|
| prompt_list = [None]
|
|
|
|
|
| domain_map = {
|
| "Sports": "sports",
|
| "Vlogs": "vlogs",
|
| "Music Videos": "music",
|
| "Podcasts": "podcasts",
|
| "Gaming": "gaming",
|
| "Comedy": "comedy",
|
| "General": "general",
|
| }
|
|
|
|
|
| clip_length_map = {
|
| "Short (30-60s)": "short",
|
| "Long (1-3 min)": "long",
|
| }
|
|
|
| queue = []
|
| test_id = 1
|
| for video in videos:
|
| video_name = Path(video).name if video else "unknown"
|
| for domain in domains:
|
| domain_value = domain_map.get(domain, "general")
|
| for clip_length in clip_lengths:
|
| clip_length_value = clip_length_map.get(clip_length, "short")
|
| for prompt in prompt_list:
|
| queue.append({
|
| "test_id": test_id,
|
| "video_path": video,
|
| "video_name": video_name,
|
| "domain": domain,
|
| "domain_value": domain_value,
|
| "clip_length": clip_length,
|
| "clip_length_value": clip_length_value,
|
| "num_clips": num_clips,
|
| "reference_image": ref_image,
|
| "custom_prompt": prompt,
|
| })
|
| test_id += 1
|
| return queue
|
|
|
|
|
| def run_single_batch_test(config: Dict[str, Any], output_base_dir: Path) -> Dict[str, Any]:
|
| """Run a single test from the batch queue."""
|
| from utils.helpers import validate_video_file
|
| from pipeline.orchestrator import PipelineOrchestrator
|
|
|
| test_id = config["test_id"]
|
| video_path = config["video_path"]
|
| video_name = config["video_name"]
|
| domain_value = config["domain_value"]
|
| clip_length = config["clip_length"]
|
| clip_length_value = config["clip_length_value"]
|
| num_clips = config["num_clips"]
|
| ref_image = config["reference_image"]
|
| custom_prompt = config["custom_prompt"]
|
|
|
|
|
| prompt_suffix = "no_prompt" if not custom_prompt else f"prompt_{hash(custom_prompt) % 1000}"
|
| test_folder = f"{Path(video_name).stem}_{domain_value}_{clip_length_value}_{prompt_suffix}"
|
| output_dir = output_base_dir / test_folder
|
| output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
| result_data = {
|
| "test_id": test_id,
|
| "video_name": video_name,
|
| "domain": domain_value,
|
| "clip_length": clip_length,
|
| "custom_prompt": custom_prompt if custom_prompt else "none",
|
| "num_clips": num_clips,
|
| "status": "failed",
|
| "error": None,
|
| "processing_time": 0,
|
| "frames_analyzed": 0,
|
| "scenes_detected": 0,
|
| "hooks_detected": 0,
|
| "clips": [],
|
| "clip_paths": [],
|
| }
|
|
|
| try:
|
|
|
| validation = validate_video_file(video_path)
|
| if not validation.is_valid:
|
| result_data["error"] = validation.error_message
|
| return result_data
|
|
|
|
|
| pipeline = PipelineOrchestrator()
|
| result = pipeline.process(
|
| video_path=video_path,
|
| num_clips=num_clips,
|
| clip_length=clip_length_value,
|
| domain=domain_value,
|
| reference_image=ref_image,
|
| custom_prompt=custom_prompt,
|
| )
|
|
|
| if result.success:
|
| result_data["status"] = "success"
|
| result_data["processing_time"] = round(result.processing_time, 2)
|
| result_data["frames_analyzed"] = len(result.visual_features)
|
| result_data["scenes_detected"] = len(result.scenes)
|
| result_data["hooks_detected"] = sum(1 for s in result.scores if s.combined_score > 0.7) if result.scores else 0
|
|
|
|
|
| for i, clip in enumerate(result.clips):
|
| if clip.clip_path.exists():
|
| clip_output = output_dir / f"clip_{i+1}.mp4"
|
| shutil.copy2(clip.clip_path, clip_output)
|
| result_data["clip_paths"].append(str(clip_output))
|
|
|
|
|
| hook_type = "none"
|
| hook_confidence = 0.0
|
| for score in result.scores:
|
| if abs(score.start_time - clip.start_time) < 1.0:
|
| if score.combined_score > 0.7:
|
| hook_confidence = score.combined_score
|
| if score.audio_score > score.visual_score and score.audio_score > score.motion_score:
|
| hook_type = "audio_peak"
|
| elif score.motion_score > score.visual_score:
|
| hook_type = "motion_spike"
|
| else:
|
| hook_type = "visual_highlight"
|
| break
|
|
|
| result_data["clips"].append({
|
| "clip_id": i + 1,
|
| "start_time": round(clip.start_time, 2),
|
| "end_time": round(clip.end_time, 2),
|
| "duration": round(clip.duration, 2),
|
| "hype_score": round(clip.hype_score, 4),
|
| "visual_score": round(clip.visual_score, 4),
|
| "audio_score": round(clip.audio_score, 4),
|
| "motion_score": round(clip.motion_score, 4),
|
| "hook_type": hook_type,
|
| "hook_confidence": round(hook_confidence, 4),
|
| })
|
| else:
|
| result_data["error"] = result.error_message
|
|
|
| pipeline.cleanup()
|
|
|
| except Exception as e:
|
| result_data["error"] = str(e)
|
| logger.exception(f"Batch test {test_id} failed")
|
|
|
| return result_data
|
|
|
|
|
| def results_to_dataframe(results: List[Dict[str, Any]]) -> pd.DataFrame:
|
| """Convert batch results to a pandas DataFrame for display."""
|
| rows = []
|
| for r in results:
|
| row = {
|
| "Test ID": r["test_id"],
|
| "Video": r["video_name"],
|
| "Domain": r["domain"],
|
| "Length": r["clip_length"],
|
| "Prompt": r["custom_prompt"][:20] + "..." if len(r["custom_prompt"]) > 20 else r["custom_prompt"],
|
| "Status": r["status"],
|
| "Time (s)": r["processing_time"],
|
| "Frames": r["frames_analyzed"],
|
| "Hooks": r["hooks_detected"],
|
| }
|
|
|
| for i, clip in enumerate(r.get("clips", [])[:3]):
|
| row[f"Clip {i+1} Hype"] = clip.get("hype_score", 0)
|
| rows.append(row)
|
| return pd.DataFrame(rows)
|
|
|
|
|
| def results_to_csv(results: List[Dict[str, Any]]) -> str:
|
| """Convert results to CSV format."""
|
| rows = []
|
| for r in results:
|
| row = {
|
| "test_id": r["test_id"],
|
| "video_name": r["video_name"],
|
| "domain": r["domain"],
|
| "clip_length": r["clip_length"],
|
| "custom_prompt": r["custom_prompt"],
|
| "num_clips": r["num_clips"],
|
| "status": r["status"],
|
| "error": r.get("error", ""),
|
| "processing_time": r["processing_time"],
|
| "frames_analyzed": r["frames_analyzed"],
|
| "scenes_detected": r["scenes_detected"],
|
| "hooks_detected": r["hooks_detected"],
|
| }
|
|
|
| for i in range(3):
|
| if i < len(r.get("clips", [])):
|
| clip = r["clips"][i]
|
| row[f"clip_{i+1}_start"] = clip["start_time"]
|
| row[f"clip_{i+1}_end"] = clip["end_time"]
|
| row[f"clip_{i+1}_hype"] = clip["hype_score"]
|
| row[f"clip_{i+1}_visual"] = clip["visual_score"]
|
| row[f"clip_{i+1}_audio"] = clip["audio_score"]
|
| row[f"clip_{i+1}_motion"] = clip["motion_score"]
|
| row[f"clip_{i+1}_hook_type"] = clip["hook_type"]
|
| else:
|
| row[f"clip_{i+1}_start"] = ""
|
| row[f"clip_{i+1}_end"] = ""
|
| row[f"clip_{i+1}_hype"] = ""
|
| row[f"clip_{i+1}_visual"] = ""
|
| row[f"clip_{i+1}_audio"] = ""
|
| row[f"clip_{i+1}_motion"] = ""
|
| row[f"clip_{i+1}_hook_type"] = ""
|
| rows.append(row)
|
|
|
| df = pd.DataFrame(rows)
|
| return df.to_csv(index=False)
|
|
|
|
|
| def results_to_json(results: List[Dict[str, Any]]) -> str:
|
| """Convert results to JSON format."""
|
|
|
| export_results = []
|
| for r in results:
|
| r_copy = r.copy()
|
| r_copy.pop("clip_paths", None)
|
| export_results.append(r_copy)
|
| return json.dumps(export_results, indent=2)
|
|
|
|
|
| def create_clips_zip(results: List[Dict[str, Any]]) -> Optional[str]:
|
| """Create a ZIP file of all extracted clips."""
|
| zip_path = Path(tempfile.mkdtemp()) / "batch_clips.zip"
|
|
|
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
|
| for r in results:
|
| if r["status"] == "success":
|
| folder_name = f"{Path(r['video_name']).stem}_{r['domain']}_{r['clip_length']}"
|
| if r["custom_prompt"] != "none":
|
| folder_name += f"_prompt"
|
| for clip_path in r.get("clip_paths", []):
|
| if Path(clip_path).exists():
|
| arcname = f"{folder_name}/{Path(clip_path).name}"
|
| zf.write(clip_path, arcname)
|
|
|
| return str(zip_path) if zip_path.exists() else None
|
|
|
|
|
|
|
| batch_state = {
|
| "is_running": False,
|
| "should_cancel": False,
|
| "results": [],
|
| "output_dir": None,
|
| }
|
|
|
|
|
| def run_batch_tests(
|
| videos,
|
| domains,
|
| clip_lengths,
|
| num_clips,
|
| reference_image,
|
| include_no_prompt,
|
| prompt1,
|
| prompt2,
|
| prompt3,
|
| progress=gr.Progress()
|
| ):
|
| """Main batch testing function."""
|
| global batch_state
|
|
|
|
|
| if not videos:
|
| return "Please upload at least one video.", None, "", "", None, None, None
|
|
|
| if not domains:
|
| return "Please select at least one domain.", None, "", "", None, None, None
|
|
|
| if not clip_lengths:
|
| return "Please select at least one clip length.", None, "", "", None, None, None
|
|
|
|
|
| prompts = [p for p in [prompt1, prompt2, prompt3] if p and p.strip()]
|
|
|
|
|
| queue = generate_test_queue(
|
| videos=videos,
|
| domains=domains,
|
| clip_lengths=clip_lengths,
|
| num_clips=int(num_clips),
|
| ref_image=reference_image,
|
| prompts=prompts,
|
| include_no_prompt=include_no_prompt,
|
| )
|
|
|
| if not queue:
|
| return "No tests to run. Please check your configuration.", None, "", "", None, None, None
|
|
|
|
|
| batch_state["is_running"] = True
|
| batch_state["should_cancel"] = False
|
| batch_state["results"] = []
|
| batch_state["output_dir"] = Path(tempfile.mkdtemp(prefix="shortsmith_batch_"))
|
|
|
| total_tests = len(queue)
|
| log_messages = []
|
|
|
| def log(msg):
|
| log_messages.append(f"[{time.strftime('%H:%M:%S')}] {msg}")
|
| logger.info(msg)
|
|
|
| log(f"Starting batch testing: {total_tests} tests")
|
| log(f"Videos: {len(videos)}, Domains: {len(domains)}, Lengths: {len(clip_lengths)}, Prompts: {len(prompts) + (1 if include_no_prompt else 0)}")
|
|
|
|
|
| for i, test_config in enumerate(queue):
|
| if batch_state["should_cancel"]:
|
| log("Batch cancelled by user")
|
| break
|
|
|
| test_id = test_config["test_id"]
|
| video_name = test_config["video_name"]
|
| domain = test_config["domain_value"]
|
| clip_length = test_config["clip_length"]
|
| prompt = test_config["custom_prompt"] or "no-prompt"
|
|
|
| log(f"[{i+1}/{total_tests}] Testing: {video_name} | {domain} | {clip_length} | {prompt[:30]}...")
|
| progress((i + 1) / total_tests, desc=f"Test {i+1}/{total_tests}: {video_name}")
|
|
|
|
|
| result = run_single_batch_test(test_config, batch_state["output_dir"])
|
| batch_state["results"].append(result)
|
|
|
| if result["status"] == "success":
|
| log(f" ✓ Completed in {result['processing_time']}s")
|
| else:
|
| log(f" ✗ Failed: {result.get('error', 'Unknown error')}")
|
|
|
|
|
| batch_state["is_running"] = False
|
| completed = len([r for r in batch_state["results"] if r["status"] == "success"])
|
| failed = len([r for r in batch_state["results"] if r["status"] == "failed"])
|
|
|
| log(f"Batch complete: {completed} succeeded, {failed} failed")
|
|
|
|
|
| results_df = results_to_dataframe(batch_state["results"])
|
| csv_content = results_to_csv(batch_state["results"])
|
| json_content = results_to_json(batch_state["results"])
|
|
|
|
|
| csv_path = batch_state["output_dir"] / "results.csv"
|
| json_path = batch_state["output_dir"] / "results.json"
|
| csv_path.write_text(csv_content)
|
| json_path.write_text(json_content)
|
|
|
|
|
| zip_path = create_clips_zip(batch_state["results"])
|
|
|
| status = f"Batch complete: {completed}/{total_tests} tests succeeded"
|
|
|
| return (
|
| status,
|
| results_df,
|
| "\n".join(log_messages),
|
| json_content,
|
| str(csv_path),
|
| str(json_path),
|
| zip_path,
|
| )
|
|
|
|
|
| def cancel_batch():
|
| """Cancel the running batch."""
|
| global batch_state
|
| batch_state["should_cancel"] = True
|
| return "Cancelling batch... (will stop after current test completes)"
|
|
|
|
|
| def calculate_queue_size(videos, domains, clip_lengths, include_no_prompt, prompt1, prompt2, prompt3):
|
| """Calculate and display the queue size."""
|
| num_videos = len(videos) if videos else 0
|
| num_domains = len(domains) if domains else 0
|
| num_lengths = len(clip_lengths) if clip_lengths else 0
|
|
|
| prompts = [p for p in [prompt1, prompt2, prompt3] if p and p.strip()]
|
| num_prompts = len(prompts) + (1 if include_no_prompt else 0)
|
| if num_prompts == 0:
|
| num_prompts = 1
|
|
|
| total = num_videos * num_domains * num_lengths * num_prompts
|
|
|
| return f"Queue: {num_videos} video(s) × {num_domains} domain(s) × {num_lengths} length(s) × {num_prompts} prompt(s) = **{total} tests**"
|
|
|
|
|
| def generate_clip_preview(num_clips):
|
| """Generate HTML preview of expected clips."""
|
| if num_clips <= 0:
|
| return "<p>No clips to preview</p>"
|
|
|
| preview_html = '<div style="display: flex; flex-wrap: wrap; gap: 10px; margin: 10px 0;">'
|
|
|
| for i in range(num_clips):
|
| preview_html += f'''
|
| <div style="border: 2px dashed #ccc; border-radius: 8px; padding: 20px; text-align: center; width: 150px; background: #0b1619;">
|
| <div style="font-size: 48px; color: ##f2fafc; margin-bottom: 10px;">🎥</div>
|
| <div style="font-weight: bold; color: ##f2fafc;">Clip {i+1}</div>
|
| <div style="font-size: 12px; color: ##a1a5a6;">Processing...</div>
|
| </div>
|
| '''
|
|
|
| preview_html += '</div>'
|
| return preview_html
|
|
|
|
|
|
|
|
|
|
|
|
|
| with gr.Blocks(
|
| title="ShortSmith v3",
|
| theme=gr.themes.Soft(),
|
| css="""
|
| .container { max-width: 1200px; margin: auto; }
|
| .output-video { min-height: 200px; }
|
| """
|
| ) as demo:
|
|
|
| gr.Markdown("""
|
| # ShortSmith v3
|
| ### AI-Powered Video Highlight Extractor
|
|
|
| Upload a video and automatically extract the most engaging highlight clips using AI analysis.
|
| """)
|
|
|
| with gr.Tabs():
|
|
|
|
|
|
|
| with gr.TabItem("Single Video"):
|
| with gr.Row():
|
|
|
| with gr.Column(scale=1):
|
| gr.Markdown("### Input")
|
|
|
| video_input = gr.Video(
|
| label="Upload Video",
|
| sources=["upload"],
|
| )
|
|
|
| with gr.Accordion("Settings", open=True):
|
| domain_dropdown = gr.Dropdown(
|
| choices=["Sports", "Vlogs", "Music Videos", "Podcasts", "Gaming", "Comedy", "General"],
|
| value="General",
|
| label="Content Domain",
|
| info="Select the type of content for optimized scoring"
|
| )
|
|
|
| with gr.Row():
|
| num_clips_slider = gr.Slider(
|
| minimum=1,
|
| maximum=5,
|
| value=3,
|
| step=1,
|
| label="Number of Clips",
|
| info="How many highlight clips to extract"
|
| )
|
| clip_length_radio = gr.Radio(
|
| choices=["Short (30-60s)", "Long (1-3 min)"],
|
| value="Short (30-60s)",
|
| label="Clip Length",
|
| info="Short clips for social media, long clips for YouTube"
|
| )
|
|
|
| with gr.Accordion("Person Filtering (Optional)", open=False):
|
| reference_image = gr.Image(
|
| label="Reference Image",
|
| type="filepath",
|
| sources=["upload"],
|
| )
|
| gr.Markdown("*Upload a photo of a person to prioritize clips featuring them.*")
|
|
|
| with gr.Accordion("Custom Instructions (Optional)", open=False):
|
| custom_prompt = gr.Textbox(
|
| label="Additional Instructions",
|
| placeholder="E.g., 'Focus on crowd reactions' or 'Prioritize action scenes'",
|
| lines=2,
|
| )
|
|
|
| process_btn = gr.Button(
|
| "Extract Highlights",
|
| variant="primary",
|
| size="lg"
|
| )
|
|
|
|
|
| with gr.Column(scale=1):
|
| gr.Markdown("### Output")
|
|
|
| status_output = gr.Textbox(
|
| label="Status",
|
| lines=2,
|
| interactive=False
|
| )
|
|
|
| gr.Markdown("#### Extracted Clips")
|
| clips_output = gr.HTML(label="Extracted Clips")
|
|
|
| with gr.Accordion("Processing Log", open=True):
|
| log_output = gr.Textbox(
|
| label="Log",
|
| lines=10,
|
| interactive=False,
|
| show_copy_button=True
|
| )
|
|
|
| with gr.Accordion("Automated Metrics (System-Generated)", open=True):
|
| metrics_output = gr.Textbox(
|
| label="Metrics for Testing",
|
| lines=20,
|
| interactive=False,
|
| show_copy_button=True,
|
| info="Copy these metrics for evaluation spreadsheets"
|
| )
|
|
|
|
|
| process_btn.click(
|
| fn=process_video,
|
| inputs=[
|
| video_input,
|
| domain_dropdown,
|
| num_clips_slider,
|
| clip_length_radio,
|
| reference_image,
|
| custom_prompt
|
| ],
|
| outputs=[
|
| status_output,
|
| clips_output,
|
| log_output,
|
| metrics_output
|
| ],
|
| show_progress="full"
|
| )
|
|
|
|
|
| num_clips_slider.change(
|
| fn=generate_clip_preview,
|
| inputs=[num_clips_slider],
|
| outputs=[clips_output]
|
| )
|
|
|
|
|
| demo.load(
|
| fn=lambda: generate_clip_preview(3),
|
| inputs=[],
|
| outputs=[clips_output]
|
| )
|
|
|
|
|
|
|
|
|
| with gr.TabItem("Batch Testing"):
|
| with gr.Row():
|
|
|
| with gr.Column(scale=1):
|
| gr.Markdown("### Batch Configuration")
|
|
|
| batch_videos = gr.File(
|
| label="Upload Video(s)",
|
| file_count="multiple",
|
| file_types=["video"],
|
| )
|
|
|
| gr.Markdown("#### Domains to Test")
|
| batch_domains = gr.CheckboxGroup(
|
| choices=["Sports", "Vlogs", "Music Videos", "Podcasts", "Gaming", "Comedy", "General"],
|
| value=["General"],
|
| label="Select domains",
|
| )
|
|
|
| gr.Markdown("#### Clip Lengths to Test")
|
| batch_clip_lengths = gr.CheckboxGroup(
|
| choices=["Short (30-60s)", "Long (1-3 min)"],
|
| value=["Short (30-60s)"],
|
| label="Select clip lengths",
|
| )
|
|
|
| batch_num_clips = gr.Slider(
|
| minimum=1,
|
| maximum=5,
|
| value=3,
|
| step=1,
|
| label="Number of Clips per Test",
|
| )
|
|
|
| with gr.Accordion("Custom Prompts", open=True):
|
| batch_no_prompt = gr.Checkbox(
|
| label="Include no-prompt baseline",
|
| value=True,
|
| info="Test without any custom prompt for comparison"
|
| )
|
| batch_prompt1 = gr.Textbox(
|
| label="Prompt 1",
|
| placeholder="E.g., 'Focus on action moments'",
|
| lines=1,
|
| )
|
| batch_prompt2 = gr.Textbox(
|
| label="Prompt 2",
|
| placeholder="E.g., 'Find crowd reactions'",
|
| lines=1,
|
| )
|
| batch_prompt3 = gr.Textbox(
|
| label="Prompt 3",
|
| placeholder="E.g., 'Prioritize emotional moments'",
|
| lines=1,
|
| )
|
|
|
| with gr.Accordion("Reference Image (Optional)", open=False):
|
| batch_ref_image = gr.Image(
|
| label="Reference Image (applies to all tests)",
|
| type="filepath",
|
| sources=["upload"],
|
| )
|
|
|
|
|
| queue_info = gr.Markdown("Queue: 0 tests")
|
|
|
| with gr.Row():
|
| batch_start_btn = gr.Button(
|
| "Start Batch",
|
| variant="primary",
|
| size="lg"
|
| )
|
| batch_cancel_btn = gr.Button(
|
| "Cancel",
|
| variant="secondary",
|
| size="lg"
|
| )
|
|
|
|
|
| with gr.Column(scale=1):
|
| gr.Markdown("### Results")
|
|
|
| batch_status = gr.Textbox(
|
| label="Status",
|
| lines=2,
|
| interactive=False
|
| )
|
|
|
| batch_results_table = gr.Dataframe(
|
| label="Test Results",
|
| headers=["Test ID", "Video", "Domain", "Length", "Prompt", "Status", "Time (s)", "Frames", "Hooks"],
|
| interactive=False,
|
| )
|
|
|
| with gr.Accordion("Processing Log", open=True):
|
| batch_log = gr.Textbox(
|
| label="Log",
|
| lines=15,
|
| interactive=False,
|
| show_copy_button=True
|
| )
|
|
|
| with gr.Accordion("Full Results (JSON)", open=False):
|
| batch_json = gr.Textbox(
|
| label="JSON Output",
|
| lines=10,
|
| interactive=False,
|
| show_copy_button=True
|
| )
|
|
|
| gr.Markdown("#### Download Results")
|
| with gr.Row():
|
| csv_download = gr.File(label="CSV Results")
|
| json_download = gr.File(label="JSON Results")
|
| zip_download = gr.File(label="All Clips (ZIP)")
|
|
|
|
|
| queue_inputs = [batch_videos, batch_domains, batch_clip_lengths, batch_no_prompt, batch_prompt1, batch_prompt2, batch_prompt3]
|
| for inp in queue_inputs:
|
| inp.change(
|
| fn=calculate_queue_size,
|
| inputs=queue_inputs,
|
| outputs=queue_info
|
| )
|
|
|
|
|
| batch_start_btn.click(
|
| fn=run_batch_tests,
|
| inputs=[
|
| batch_videos,
|
| batch_domains,
|
| batch_clip_lengths,
|
| batch_num_clips,
|
| batch_ref_image,
|
| batch_no_prompt,
|
| batch_prompt1,
|
| batch_prompt2,
|
| batch_prompt3,
|
| ],
|
| outputs=[
|
| batch_status,
|
| batch_results_table,
|
| batch_log,
|
| batch_json,
|
| csv_download,
|
| json_download,
|
| zip_download,
|
| ],
|
| show_progress="full"
|
| )
|
|
|
| batch_cancel_btn.click(
|
| fn=cancel_batch,
|
| inputs=[],
|
| outputs=[batch_status]
|
| )
|
|
|
| gr.Markdown("""
|
| ---
|
| **ShortSmith v3** | Powered by Qwen2-VL, InsightFace, and Librosa |
|
| [GitHub](https://github.com) | Built with Gradio
|
| """)
|
|
|
|
|
| if __name__ == "__main__":
|
| demo.queue()
|
| demo.launch(
|
| server_name="0.0.0.0",
|
| server_port=7860,
|
| show_error=True
|
| )
|
| else:
|
|
|
| demo.queue()
|
| demo.launch()
|
|
|