Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| HuggingFace Segment-Based Video Highlights Generator | |
| Based on HuggingFace's SmolVLM2-HighlightGenerator approach | |
| Optimized for HuggingFace Spaces with 256M model for resource efficiency | |
| """ | |
| import os | |
| import sys | |
| import argparse | |
| import json | |
| import subprocess | |
| import tempfile | |
| import re | |
| from pathlib import Path | |
| from PIL import Image | |
| from typing import List, Dict, Tuple, Optional | |
| import logging | |
| # Add src directory to path for imports | |
| sys.path.append(str(Path(__file__).parent / "src")) | |
| try: | |
| from src.smolvlm2_handler import SmolVLM2Handler | |
| except ImportError: | |
| print("β SmolVLM2Handler not found. Make sure to install dependencies first.") | |
| sys.exit(1) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class HuggingFaceVideoHighlightDetector: | |
| """ | |
| HuggingFace Segment-Based Video Highlight Detection | |
| Uses fixed-length segments for consistent AI classification | |
| """ | |
| def __init__(self, model_name: str = "HuggingFaceTB/SmolVLM2-256M-Video-Instruct", device: str = "auto"): | |
| """Initialize with SmolVLM2 model - 2.2B provides much better reasoning than 256M""" | |
| print(f"π₯ Loading {model_name} for HuggingFace Segment-Based Analysis...") | |
| self.vlm_handler = SmolVLM2Handler(model_name=model_name, device=device) | |
| print(f"π₯οΈ Using device: {self.vlm_handler.get_model_info().get('device', 'unknown')}") | |
| print("β SmolVLM2 loaded successfully!") | |
| def get_video_duration_seconds(self, video_path: str) -> float: | |
| """Get video duration using ffprobe""" | |
| cmd = [ | |
| "ffprobe", "-v", "quiet", "-show_entries", | |
| "format=duration", "-of", "csv=p=0", video_path | |
| ] | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return float(result.stdout.strip()) | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Failed to get video duration: {e}") | |
| return 0.0 | |
| def _sentence_count(self, text: str) -> int: | |
| sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()] | |
| return len(sentences) | |
| def _normalize_sentences(self, text: str, min_sentences: int, max_sentences: int) -> str: | |
| cleaned = text.replace("\n", " ").replace("**", "") | |
| cleaned = re.sub(r"\s+", " ", cleaned).strip() | |
| parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", cleaned) if p.strip()] | |
| sentences = [] | |
| for part in parts: | |
| s = re.sub(r"^\d+\.\s*", "", part) | |
| s = re.sub(r"^[-*]\s*", "", s) | |
| if len(s.split()) >= 3: | |
| sentences.append(s) | |
| if not sentences: | |
| return cleaned | |
| if len(sentences) >= min_sentences: | |
| return " ".join(sentences[:max_sentences]).strip() | |
| return " ".join(sentences).strip() | |
| def analyze_video_content(self, video_path: str) -> str: | |
| """Get overall video description by analyzing multiple frames""" | |
| duration = self.get_video_duration_seconds(video_path) | |
| if duration <= 0: | |
| return "Unable to analyze video content" | |
| # Use five anchored points to support a grounded 4-5 sentence summary. | |
| frame_times = [duration * 0.1, duration * 0.3, duration * 0.5, duration * 0.7, duration * 0.9] | |
| descriptions = [] | |
| seen = set() | |
| for i, time_point in enumerate(frame_times): | |
| with tempfile.NamedTemporaryFile(suffix=f'_frame_{i}.jpg', delete=False) as temp_frame: | |
| cmd = [ | |
| "ffmpeg", "-v", "quiet", "-i", video_path, | |
| "-ss", str(time_point), "-vframes", "1", "-y", temp_frame.name | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| prompt = ( | |
| f"Describe what is visibly happening in this frame at {time_point:.1f}s in exactly one factual sentence. " | |
| "Mention subjects, actions, and setting. Do not guess unseen details." | |
| ) | |
| description = self.vlm_handler.generate_response( | |
| temp_frame.name, | |
| prompt, | |
| max_new_tokens=80, | |
| temperature=0.2, | |
| do_sample=False | |
| ) | |
| sentence = self._normalize_sentences(description.strip(), 1, 1) | |
| key = sentence.lower().strip() | |
| if key and key not in seen: | |
| seen.add(key) | |
| descriptions.append(sentence) | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Failed to extract frame at {time_point}s: {e}") | |
| continue | |
| finally: | |
| # Clean up temp file | |
| if os.path.exists(temp_frame.name): | |
| os.unlink(temp_frame.name) | |
| if descriptions: | |
| composed = self._normalize_sentences(" ".join(descriptions[:5]), 4, 5) | |
| if self._sentence_count(composed) >= 4: | |
| return composed | |
| # Fallback: pull one extra midpoint frame if we still have fewer than 4 sentences. | |
| with tempfile.NamedTemporaryFile(suffix='_frame_mid.jpg', delete=False) as temp_frame: | |
| mid_time = duration * 0.5 | |
| cmd = [ | |
| "ffmpeg", "-v", "quiet", "-i", video_path, | |
| "-ss", str(mid_time), "-vframes", "1", "-y", temp_frame.name | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| extra = self.vlm_handler.generate_response( | |
| temp_frame.name, | |
| "Describe this frame in exactly one factual sentence with visible actions and setting.", | |
| max_new_tokens=80, | |
| temperature=0.2, | |
| do_sample=False | |
| ) | |
| extra_sentence = self._normalize_sentences(extra.strip(), 1, 1) | |
| if extra_sentence.lower().strip() not in seen: | |
| descriptions.append(extra_sentence) | |
| except Exception: | |
| pass | |
| finally: | |
| if os.path.exists(temp_frame.name): | |
| os.unlink(temp_frame.name) | |
| return self._normalize_sentences(" ".join(descriptions[:5]), 4, 5) | |
| else: | |
| return "Unable to analyze video content" | |
| def determine_highlights(self, video_description: str) -> Tuple[str, str]: | |
| """Generate simple, focused criteria based on actual video content""" | |
| # Instead of generating hallucinated criteria, use simple general criteria | |
| # that can be applied to any video segment | |
| criteria_set_1 = """Look for segments with: | |
| - Significant movement or action | |
| - Clear visual activity or events happening | |
| - People interacting or doing activities | |
| - Changes in scene or camera angle | |
| - Dynamic or interesting visual content""" | |
| criteria_set_2 = """Look for segments with: | |
| - Interesting facial expressions or gestures | |
| - Multiple people or subjects in frame | |
| - Good lighting and clear visibility | |
| - Engaging activities or behaviors | |
| - Visually appealing or well-composed shots""" | |
| return criteria_set_1, criteria_set_2 | |
| def process_segment(self, video_path: str, start_time: float, end_time: float, | |
| highlight_criteria: str, segment_num: int, total_segments: int) -> str: | |
| """Process a single 5-second segment and determine if it matches criteria""" | |
| # Extract 3 frames from the segment for analysis | |
| segment_duration = end_time - start_time | |
| frame_times = [ | |
| start_time + segment_duration * 0.2, # 20% into segment | |
| start_time + segment_duration * 0.5, # Middle of segment | |
| start_time + segment_duration * 0.8 # 80% into segment | |
| ] | |
| temp_frames = [] | |
| try: | |
| # Extract frames | |
| for i, frame_time in enumerate(frame_times): | |
| temp_frame = tempfile.NamedTemporaryFile(suffix=f'_frame_{i}.jpg', delete=False) | |
| temp_frames.append(temp_frame.name) | |
| temp_frame.close() | |
| cmd = [ | |
| "ffmpeg", "-v", "quiet", "-i", video_path, | |
| "-ss", str(frame_time), "-vframes", "1", "-y", temp_frame.name | |
| ] | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| # Create prompt for segment classification - direct evaluation | |
| prompt = f"""Look at this frame from a {segment_duration:.1f}-second video segment. | |
| Rate this video segment for highlight potential on a scale of 1-10, where: | |
| - 1-3: Boring, static, nothing interesting happening | |
| - 4-6: Moderately interesting, some activity or visual interest | |
| - 7-10: Very interesting, dynamic action, engaging content worth highlighting | |
| Consider: | |
| - Amount of movement and activity | |
| - Visual interest and composition | |
| - People interactions or engaging behavior | |
| - Overall entertainment value | |
| Give ONLY a number from 1-10, nothing else.""" | |
| # Get AI response using first frame (SmolVLM2Handler expects single image) | |
| response = self.vlm_handler.generate_response(temp_frames[0], prompt) | |
| # Extract numeric score from response | |
| try: | |
| # Try to extract a number from the response | |
| import re | |
| numbers = re.findall(r'\b(\d+)\b', response) | |
| if numbers: | |
| score = int(numbers[0]) | |
| if 1 <= score <= 10: | |
| print(f" π€ Score: {score}/10") | |
| return str(score) | |
| print(f" π€ Response: {response} (couldn't extract valid score)") | |
| return "1" # Default to low score if no valid number | |
| except: | |
| print(f" π€ Response: {response} (error parsing)") | |
| return "1" | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Failed to process segment {segment_num}: {e}") | |
| return "no" | |
| finally: | |
| # Clean up temp frames | |
| for temp_frame in temp_frames: | |
| if os.path.exists(temp_frame): | |
| os.unlink(temp_frame) | |
| def create_video_segment(self, video_path: str, start_sec: float, end_sec: float, output_path: str) -> bool: | |
| """Create a video segment using ffmpeg.""" | |
| cmd = [ | |
| "ffmpeg", | |
| "-v", "quiet", # Suppress FFmpeg output | |
| "-y", | |
| "-i", video_path, | |
| "-ss", str(start_sec), | |
| "-to", str(end_sec), | |
| "-c", "copy", # Copy without re-encoding for speed | |
| output_path | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Failed to create segment: {e}") | |
| return False | |
| def concatenate_scenes(self, video_path: str, scene_times: List[Tuple[float, float]], | |
| output_path: str, with_effects: bool = True) -> bool: | |
| """Concatenate selected scenes with optional effects""" | |
| if with_effects: | |
| return self._concatenate_with_effects(video_path, scene_times, output_path) | |
| else: | |
| return self._concatenate_basic(video_path, scene_times, output_path) | |
| def _concatenate_basic(self, video_path: str, scene_times: List[Tuple[float, float]], output_path: str) -> bool: | |
| """Basic concatenation without effects""" | |
| if not scene_times: | |
| logger.error("No scenes to concatenate") | |
| return False | |
| # Create temporary files for each segment | |
| temp_files = [] | |
| temp_list_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) | |
| try: | |
| for i, (start_sec, end_sec) in enumerate(scene_times): | |
| temp_file = tempfile.NamedTemporaryFile(suffix=f'_segment_{i}.mp4', delete=False) | |
| temp_files.append(temp_file.name) | |
| temp_file.close() | |
| # Create segment | |
| if not self.create_video_segment(video_path, start_sec, end_sec, temp_file.name): | |
| return False | |
| # Add to concat list | |
| temp_list_file.write(f"file '{temp_file.name}'\n") | |
| temp_list_file.close() | |
| # Concatenate all segments | |
| cmd = [ | |
| "ffmpeg", "-v", "quiet", "-y", | |
| "-f", "concat", "-safe", "0", | |
| "-i", temp_list_file.name, | |
| "-c", "copy", | |
| output_path | |
| ] | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Failed to concatenate scenes: {e}") | |
| return False | |
| finally: | |
| # Cleanup | |
| for temp_file in temp_files: | |
| if os.path.exists(temp_file): | |
| os.unlink(temp_file) | |
| if os.path.exists(temp_list_file.name): | |
| os.unlink(temp_list_file.name) | |
| def _concatenate_with_effects(self, video_path: str, scene_times: List[Tuple[float, float]], output_path: str) -> bool: | |
| """Simple concatenation with basic fade transitions.""" | |
| filter_complex_parts = [] | |
| concat_inputs = [] | |
| # Simple fade duration | |
| fade_duration = 0.5 | |
| for i, (start_sec, end_sec) in enumerate(scene_times): | |
| print(f" β¨ Segment {i+1}: {start_sec:.1f}s - {end_sec:.1f}s ({end_sec-start_sec:.1f}s) with FADE effect") | |
| # Simple video effects: just trim and basic fade | |
| video_effects = ( | |
| f"trim=start={start_sec}:end={end_sec}," | |
| f"setpts=PTS-STARTPTS," | |
| f"fade=t=in:st=0:d={fade_duration}," | |
| f"fade=t=out:st={max(0, end_sec-start_sec-fade_duration)}:d={fade_duration}" | |
| ) | |
| filter_complex_parts.append(f"[0:v]{video_effects}[v{i}];") | |
| # Simple audio effects: just trim and fade | |
| audio_effects = ( | |
| f"atrim=start={start_sec}:end={end_sec}," | |
| f"asetpts=PTS-STARTPTS," | |
| f"afade=t=in:st=0:d={fade_duration}," | |
| f"afade=t=out:st={max(0, end_sec-start_sec-fade_duration)}:d={fade_duration}" | |
| ) | |
| filter_complex_parts.append(f"[0:a]{audio_effects}[a{i}];") | |
| concat_inputs.append(f"[v{i}][a{i}]") | |
| # Simple concatenate all segments | |
| concat_filter = f"{''.join(concat_inputs)}concat=n={len(scene_times)}:v=1:a=1[outv][outa];" | |
| filter_complex = "".join(filter_complex_parts) + concat_filter | |
| cmd = [ | |
| "ffmpeg", | |
| "-v", "quiet", | |
| "-y", | |
| "-i", video_path, | |
| "-filter_complex", filter_complex, | |
| "-map", "[outv]", | |
| "-map", "[outa]", | |
| "-c:v", "libx264", | |
| "-preset", "medium", | |
| "-crf", "23", | |
| "-c:a", "aac", | |
| "-b:a", "128k", | |
| "-pix_fmt", "yuv420p", | |
| output_path | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Failed to concatenate scenes with effects: {e}") | |
| return False | |
| def _single_segment_with_effects(self, video_path: str, scene_time: Tuple[float, float], output_path: str) -> bool: | |
| """Apply simple effects to a single segment.""" | |
| start_sec, end_sec = scene_time | |
| print(f" β¨ Single segment: {start_sec:.1f}s - {end_sec:.1f}s ({end_sec-start_sec:.1f}s) with fade effect") | |
| # Simple video effects: just trim and fade | |
| video_effects = ( | |
| f"trim=start={start_sec}:end={end_sec}," | |
| f"setpts=PTS-STARTPTS," | |
| f"fade=t=in:st=0:d=0.5," | |
| f"fade=t=out:st={max(0, end_sec-start_sec-0.5)}:d=0.5" | |
| ) | |
| # Simple audio effects with fade | |
| audio_effects = ( | |
| f"atrim=start={start_sec}:end={end_sec}," | |
| f"asetpts=PTS-STARTPTS," | |
| f"afade=t=in:st=0:d=0.5," | |
| f"afade=t=out:st={max(0, end_sec-start_sec-0.5)}:d=0.5" | |
| ) | |
| cmd = [ | |
| "ffmpeg", | |
| "-v", "quiet", | |
| "-y", | |
| "-i", video_path, | |
| "-vf", video_effects, | |
| "-af", audio_effects, | |
| "-c:v", "libx264", | |
| "-preset", "medium", | |
| "-crf", "23", | |
| "-c:a", "aac", | |
| "-b:a", "128k", | |
| "-pix_fmt", "yuv420p", | |
| output_path | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Failed to create single segment with effects: {e}") | |
| return False | |
| def process_video(self, video_path: str, output_path: str, segment_length: float = 5.0, with_effects: bool = True) -> Dict: | |
| """Process video using HuggingFace's segment-based approach.""" | |
| print("π Starting HuggingFace Segment-Based Video Highlight Detection") | |
| print(f"π Input: {video_path}") | |
| print(f"π Output: {output_path}") | |
| print(f"β±οΈ Segment Length: {segment_length}s") | |
| print() | |
| # Get video duration | |
| duration = self.get_video_duration_seconds(video_path) | |
| if duration <= 0: | |
| return {"error": "Could not determine video duration"} | |
| print(f"πΉ Video duration: {duration:.1f}s ({duration/60:.1f} minutes)") | |
| # Step 1: Analyze overall video content | |
| print("π¬ Step 1: Analyzing overall video content...") | |
| video_description = self.analyze_video_content(video_path) | |
| print(f"π Video Description:") | |
| print(f" {video_description}") | |
| print() | |
| # Step 2: Direct scoring approach (no predefined criteria) | |
| print("π― Step 2: Using direct scoring approach - each segment rated 1-10 for highlight potential") | |
| print() | |
| # Step 3: Process segments with scoring | |
| num_segments = int(duration / segment_length) + (1 if duration % segment_length > 0 else 0) | |
| print(f"π Step 3: Processing {num_segments} segments of {segment_length}s each...") | |
| print(" Each segment will be scored 1-10 for highlight potential") | |
| print() | |
| segment_scores = [] | |
| for i in range(num_segments): | |
| start_time = i * segment_length | |
| end_time = min(start_time + segment_length, duration) | |
| progress = int((i / num_segments) * 100) if num_segments > 0 else 0 | |
| print(f"π Processing segment {i+1}/{num_segments} ({progress}%)") | |
| print(f" β° Time: {start_time:.0f}s - {end_time:.1f}s") | |
| # Get score for this segment | |
| score_str = self.process_segment(video_path, start_time, end_time, "", i+1, num_segments) | |
| try: | |
| score = int(score_str) | |
| segment_scores.append({ | |
| 'start': start_time, | |
| 'end': end_time, | |
| 'score': score | |
| }) | |
| if score >= 7: | |
| print(f" β HIGH SCORE ({score}/10) - Excellent highlight material") | |
| elif score >= 5: | |
| print(f" π‘ MEDIUM SCORE ({score}/10) - Moderate interest") | |
| else: | |
| print(f" β LOW SCORE ({score}/10) - Not highlight worthy") | |
| except ValueError: | |
| print(f" β Invalid score: {score_str}") | |
| segment_scores.append({ | |
| 'start': start_time, | |
| 'end': end_time, | |
| 'score': 1 | |
| }) | |
| print() | |
| # Sort segments by score and select top performers | |
| segment_scores.sort(key=lambda x: x['score'], reverse=True) | |
| # Select segments with score >= 6 (good highlight material) | |
| high_score_segments = [s for s in segment_scores if s['score'] >= 6] | |
| # If too few high-scoring segments, lower the threshold | |
| if len(high_score_segments) < 3: | |
| high_score_segments = [s for s in segment_scores if s['score'] >= 5] | |
| # If still too few, take top 20% of segments | |
| if len(high_score_segments) < 3: | |
| top_count = max(3, len(segment_scores) // 5) # At least 3, or 20% of total | |
| high_score_segments = segment_scores[:top_count] | |
| selected_segments = [(s['start'], s['end']) for s in high_score_segments] | |
| print("π Results Summary:") | |
| print(f" π Average score: {sum(s['score'] for s in segment_scores) / len(segment_scores):.1f}/10") | |
| print(f" π High-scoring segments (β₯6): {len([s for s in segment_scores if s['score'] >= 6])}") | |
| print(f" β Selected for highlights: {len(selected_segments)} segments ({len(selected_segments)/num_segments*100:.1f}% of video)") | |
| print() | |
| if not selected_segments: | |
| return { | |
| "error": "No segments had sufficient scores for highlights", | |
| "video_description": video_description, | |
| "segment_scores": segment_scores, | |
| "total_segments": num_segments | |
| } | |
| # Step 4: Create highlights video | |
| print(f"π¬ Step 4: Concatenating {len(selected_segments)} selected segments with {'beautiful effects & transitions' if with_effects else 'basic concatenation'}...") | |
| success = self.concatenate_scenes(video_path, selected_segments, output_path, with_effects) | |
| if success: | |
| print("β Highlights video created successfully!") | |
| total_duration = sum(end - start for start, end in selected_segments) | |
| print(f"π SUCCESS! Created highlights with {len(selected_segments)} segments") | |
| print(f" πΉ Total highlight duration: {total_duration:.1f}s") | |
| print(f" π Percentage of original video: {total_duration/duration*100:.1f}%") | |
| else: | |
| print("β Failed to create highlights video") | |
| return {"error": "Failed to create highlights video"} | |
| # Return analysis results | |
| return { | |
| "success": True, | |
| "video_description": video_description, | |
| "scoring_approach": "Direct segment scoring (1-10 scale)", | |
| "total_segments": num_segments, | |
| "selected_segments": len(selected_segments), | |
| "selected_times": selected_segments, | |
| "segment_scores": segment_scores, | |
| "average_score": sum(s['score'] for s in segment_scores) / len(segment_scores), | |
| "total_duration": total_duration, | |
| "compression_ratio": total_duration/duration, | |
| "output_path": output_path | |
| } | |
| def main(): | |
| parser = argparse.ArgumentParser(description='HuggingFace Segment-Based Video Highlights') | |
| parser.add_argument('video_path', help='Path to input video file') | |
| parser.add_argument('--output', required=True, help='Path to output highlights video') | |
| parser.add_argument('--save-analysis', action='store_true', help='Save analysis results to JSON') | |
| parser.add_argument('--segment-length', type=float, default=5.0, help='Length of each segment in seconds (default: 5.0)') | |
| parser.add_argument('--model', default='HuggingFaceTB/SmolVLM2-256M-Video-Instruct', help='SmolVLM2 model to use') | |
| parser.add_argument('--device', default='auto', choices=['auto', 'cpu', 'cuda', 'mps'], help='Inference device') | |
| parser.add_argument('--effects', action='store_true', default=True, help='Enable beautiful effects & transitions (default: True)') | |
| parser.add_argument('--no-effects', action='store_true', help='Disable effects - basic concatenation only') | |
| args = parser.parse_args() | |
| # Handle effects flag | |
| with_effects = args.effects and not args.no_effects | |
| print("π HuggingFace Approach SmolVLM2 Video Highlights") | |
| print(" Based on: https://huggingface.co/spaces/HuggingFaceTB/SmolVLM2-HighlightGenerator") | |
| print(f" Model: {args.model}") | |
| print(f" Requested Device: {args.device}") | |
| print(f" Effects: {'β¨ Beautiful effects & transitions enabled' if with_effects else 'π§ Basic concatenation only'}") | |
| print() | |
| # Initialize detector | |
| detector = HuggingFaceVideoHighlightDetector(model_name=args.model, device=args.device) | |
| # Process video | |
| results = detector.process_video( | |
| video_path=args.video_path, | |
| output_path=args.output, | |
| segment_length=args.segment_length, | |
| with_effects=with_effects | |
| ) | |
| # Save analysis if requested | |
| if args.save_analysis and 'error' not in results: | |
| analysis_path = args.output.replace('.mp4', '_hf_analysis.json') | |
| with open(analysis_path, 'w') as f: | |
| json.dump(results, f, indent=2) | |
| print(f"π Analysis saved: {analysis_path}") | |
| if 'error' in results: | |
| print(f"β {results['error']}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |