Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Audio-Enhanced Video Highlights Generator | |
| Combines SmolVLM2 visual analysis with Whisper audio transcription | |
| Supports 99+ languages including Telugu, Hindi, English | |
| """ | |
| import os | |
| import sys | |
| import cv2 | |
| import argparse | |
| import json | |
| import subprocess | |
| import threading | |
| import time | |
| import tempfile | |
| from pathlib import Path | |
| from PIL import Image | |
| from typing import List, Dict, Optional | |
| import logging | |
| # Add src directory to path for imports | |
| sys.path.append(str(Path(__file__).parent / "src")) | |
| try: | |
| from src.smolvlm2_handler import SmolVLM2Handler | |
| except ImportError: | |
| print("β SmolVLM2Handler not found. Make sure to install dependencies first.") | |
| sys.exit(1) | |
| try: | |
| import whisper | |
| WHISPER_AVAILABLE = True | |
| print("β Whisper available for audio transcription") | |
| except ImportError: | |
| WHISPER_AVAILABLE = False | |
| print("β Whisper not available. Install with: pip install openai-whisper") | |
| sys.exit(1) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class AudioVisualAnalyzer: | |
| """Comprehensive analyzer combining visual and audio analysis""" | |
| def __init__(self, whisper_model_size="base", timeout_seconds=90, enable_visual=True, visual_only_mode=False): | |
| """Initialize with SmolVLM2 and optionally Whisper models""" | |
| print("π§ Initializing Visual Analyzer...") | |
| self.enable_visual = enable_visual | |
| self.visual_only_mode = visual_only_mode | |
| # Initialize SmolVLM2 for visual analysis | |
| if self.enable_visual: | |
| print("π₯ Loading SmolVLM2...") | |
| self.vlm_handler = SmolVLM2Handler() | |
| else: | |
| print("π Visual analysis disabled") | |
| self.vlm_handler = None | |
| self.timeout_seconds = timeout_seconds | |
| # Skip Whisper loading in visual-only mode to save memory/resources | |
| if self.visual_only_mode: | |
| print("ποΈ Visual-only mode enabled - skipping audio processing to optimize performance") | |
| self.whisper_model = None | |
| elif WHISPER_AVAILABLE: | |
| print(f"π₯ Loading Whisper model ({whisper_model_size})...") | |
| self.whisper_model = whisper.load_model(whisper_model_size) | |
| print("β Whisper model loaded successfully") | |
| else: | |
| self.whisper_model = None | |
| print("β οΈ Whisper not available - audio analysis disabled") | |
| def extract_audio_segments(self, video_path: str, segments: List[Dict]) -> List[str]: | |
| """Extract audio for specific video segments""" | |
| audio_files = [] | |
| temp_dir = tempfile.mkdtemp() | |
| for i, segment in enumerate(segments): | |
| start_time = segment['start_time'] | |
| duration = segment['duration'] | |
| audio_path = os.path.join(temp_dir, f"segment_{i}.wav") | |
| # Extract audio segment using FFmpeg | |
| cmd = [ | |
| 'ffmpeg', '-i', video_path, | |
| '-ss', str(start_time), | |
| '-t', str(duration), | |
| '-vn', # No video | |
| '-acodec', 'pcm_s16le', # Uncompressed audio | |
| '-ar', '16000', # 16kHz sample rate for Whisper | |
| '-ac', '1', # Mono | |
| '-f', 'wav', # Force WAV format | |
| '-y', # Overwrite | |
| audio_path | |
| ] | |
| try: | |
| result = subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: | |
| audio_files.append(audio_path) | |
| logger.info(f"π Extracted audio segment {i+1}: {duration:.1f}s") | |
| else: | |
| logger.warning(f"β οΈ Audio segment {i+1} is empty or missing") | |
| audio_files.append(None) | |
| except subprocess.CalledProcessError as e: | |
| logger.warning(f"β οΈ No audio stream in segment {i+1} (this is normal for silent videos)") | |
| audio_files.append(None) | |
| return audio_files | |
| def transcribe_audio_segment(self, audio_path: str) -> Dict: | |
| """Transcribe audio segment with Whisper""" | |
| if not WHISPER_AVAILABLE or not audio_path or not os.path.exists(audio_path): | |
| return {"text": "", "language": "unknown", "confidence": 0.0} | |
| try: | |
| result = self.whisper_model.transcribe( | |
| audio_path, | |
| language=None, # Auto-detect language | |
| task="transcribe" | |
| ) | |
| return { | |
| "text": result.get("text", "").strip(), | |
| "language": result.get("language", "unknown"), | |
| "confidence": 1.0 # Whisper doesn't provide confidence scores | |
| } | |
| except Exception as e: | |
| logger.error(f"β Audio transcription failed: {e}") | |
| return {"text": "", "language": "unknown", "confidence": 0.0} | |
| def analyze_visual_content(self, frame_path: str) -> Dict: | |
| """Analyze visual content using SmolVLM2 with robust error handling""" | |
| # If visual analysis is disabled, return audio-focused fallback | |
| if not self.enable_visual or self.vlm_handler is None: | |
| logger.info("πΉ Visual analysis disabled, using audio-only mode") | |
| return {"description": "Audio-only analysis mode - visual analysis disabled", "score": 7.0} | |
| max_retries = 2 | |
| retry_count = 0 | |
| while retry_count < max_retries: | |
| try: | |
| def generate_with_timeout(): | |
| prompt = ("Analyze this video frame for interesting, engaging, or highlight-worthy content. " | |
| "IMPORTANT: Start your response with 'Score: X/10' where X is a number from 1-10. " | |
| "Then explain what makes it noteworthy. Focus on action, emotion, important moments, or visually striking elements. " | |
| "Rate based on: Action/movement (high scores), People talking/interacting (medium-high), " | |
| "Static scenes (low-medium), Boring/empty scenes (low scores).") | |
| return self.vlm_handler.generate_response(frame_path, prompt) | |
| # Run with timeout protection | |
| thread_result = [None] | |
| exception_result = [None] | |
| def target(): | |
| try: | |
| thread_result[0] = generate_with_timeout() | |
| except Exception as e: | |
| exception_result[0] = e | |
| thread = threading.Thread(target=target) | |
| thread.daemon = True | |
| thread.start() | |
| thread.join(self.timeout_seconds) | |
| if thread.is_alive(): | |
| logger.warning(f"β° Visual analysis timed out after {self.timeout_seconds}s (attempt {retry_count + 1})") | |
| retry_count += 1 | |
| if retry_count >= max_retries: | |
| logger.info("π Switching to audio-only mode due to visual timeout") | |
| return {"description": "Visual analysis timed out - using audio-only mode", "score": 7.0} | |
| continue | |
| if exception_result[0]: | |
| error_msg = str(exception_result[0]) | |
| if "probability tensor" in error_msg or "inf" in error_msg or "nan" in error_msg: | |
| logger.warning(f"β οΈ Model inference error, retrying (attempt {retry_count + 1}): {error_msg}") | |
| retry_count += 1 | |
| if retry_count >= max_retries: | |
| return {"description": "Model inference failed after retries", "score": 6.0} | |
| continue | |
| else: | |
| raise exception_result[0] | |
| response = thread_result[0] | |
| if not response or len(response.strip()) == 0: | |
| logger.warning(f"β οΈ Empty response, retrying (attempt {retry_count + 1})") | |
| retry_count += 1 | |
| if retry_count >= max_retries: | |
| return {"description": "No meaningful response after retries", "score": 6.0} | |
| continue | |
| # Extract score from response | |
| score = self.extract_score_from_text(response) | |
| return {"description": response, "score": score} | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.warning(f"β οΈ Visual analysis error (attempt {retry_count + 1}): {error_msg}") | |
| retry_count += 1 | |
| if retry_count >= max_retries: | |
| return {"description": f"Analysis failed after {max_retries} attempts: {error_msg}", "score": 6.0} | |
| # Fallback if all retries failed | |
| return {"description": "Analysis failed after all retry attempts", "score": 6.0} | |
| def extract_score_from_text(self, text: str) -> float: | |
| """Extract numeric score from analysis text""" | |
| import re | |
| # Look for patterns like "Score: 8/10", "8/10", "score: 7", etc. | |
| patterns = [ | |
| r'score:\s*(\d+(?:\.\d+)?)\s*/\s*10', # "Score: 8/10" (our new format) | |
| r'(\d+(?:\.\d+)?)\s*/\s*10', # "8/10" or "7.5/10" | |
| r'(?:score|rating|rate)(?:\s*[:=]\s*)(\d+(?:\.\d+)?)', # "score: 8" or "rating=7.5" | |
| r'(\d+(?:\.\d+)?)\s*(?:out of|/)\s*10', # "8 out of 10" | |
| r'(?:^|\s)(\d+(?:\.\d+)?)(?:\s*[/]\s*10)?(?:\s|$)', # Just numbers | |
| ] | |
| for pattern in patterns: | |
| matches = re.findall(pattern, text.lower()) | |
| if matches: | |
| try: | |
| score = float(matches[0]) | |
| return min(max(score, 1.0), 10.0) # Clamp between 1-10 | |
| except ValueError: | |
| continue | |
| return 6.0 # Default score if no pattern found | |
| def calculate_combined_score(self, visual_score: float, audio_text: str, audio_lang: str) -> float: | |
| """Calculate combined score from visual and audio analysis""" | |
| # Start with visual score | |
| combined_score = visual_score | |
| # Audio content scoring | |
| if audio_text: | |
| audio_bonus = 0.0 | |
| text_lower = audio_text.lower() | |
| # Positive indicators | |
| excitement_words = ['amazing', 'incredible', 'wow', 'fantastic', 'awesome', 'perfect', 'excellent'] | |
| action_words = ['goal', 'win', 'victory', 'success', 'breakthrough', 'achievement'] | |
| emotion_words = ['happy', 'excited', 'thrilled', 'surprised', 'shocked', 'love'] | |
| # Telugu positive indicators (basic) | |
| telugu_positive = ['ΰ° ΰ°¦ΰ±ΰ°ΰ±ΰ°€ΰ°', 'ΰ°ΰ°Ύΰ°²ΰ°Ύ ΰ°¬ΰ°Ύΰ°ΰ±ΰ°ΰ°¦ΰ°Ώ', 'ΰ°΅ΰ°Ύΰ°΅ΰ±', 'ΰ°Έΰ±ΰ°ͺΰ°°ΰ±'] | |
| # Count positive indicators | |
| for word_list in [excitement_words, action_words, emotion_words, telugu_positive]: | |
| for word in word_list: | |
| if word in text_lower: | |
| audio_bonus += 0.5 | |
| # Length bonus for substantial content | |
| if len(audio_text) > 50: | |
| audio_bonus += 0.3 | |
| elif len(audio_text) > 20: | |
| audio_bonus += 0.1 | |
| # Language diversity bonus | |
| if audio_lang in ['te', 'telugu']: # Telugu content | |
| audio_bonus += 0.2 | |
| elif audio_lang in ['hi', 'hindi']: # Hindi content | |
| audio_bonus += 0.2 | |
| combined_score += audio_bonus | |
| # Clamp final score | |
| return min(max(combined_score, 1.0), 10.0) | |
| def analyze_segment(self, video_path: str, segment: Dict, temp_frame_path: str) -> Dict: | |
| """Analyze a single video segment with both visual and audio""" | |
| start_time = segment['start_time'] | |
| duration = segment['duration'] | |
| logger.info(f"π Analyzing segment at {start_time:.1f}s ({duration:.1f}s duration)") | |
| # Visual analysis | |
| visual_analysis = self.analyze_visual_content(temp_frame_path) | |
| # Skip audio analysis in visual-only mode to save resources | |
| if self.visual_only_mode: | |
| logger.info("ποΈ Visual-only mode: skipping audio analysis") | |
| audio_analysis = {"text": "", "language": "unknown", "confidence": 0.0} | |
| # Use pure visual score for highlights | |
| combined_score = visual_analysis['score'] | |
| else: | |
| # Audio analysis | |
| audio_files = self.extract_audio_segments(video_path, [segment]) | |
| audio_analysis = {"text": "", "language": "unknown", "confidence": 0.0} | |
| if audio_files and audio_files[0]: | |
| audio_analysis = self.transcribe_audio_segment(audio_files[0]) | |
| # Cleanup temporary audio file | |
| try: | |
| os.unlink(audio_files[0]) | |
| except: | |
| pass | |
| # Combined scoring | |
| combined_score = self.calculate_combined_score( | |
| visual_analysis['score'], | |
| audio_analysis['text'], | |
| audio_analysis['language'] | |
| ) | |
| return { | |
| 'start_time': start_time, | |
| 'duration': duration, | |
| 'visual_score': visual_analysis['score'], | |
| 'visual_description': visual_analysis['description'], | |
| 'audio_text': audio_analysis['text'], | |
| 'audio_language': audio_analysis['language'], | |
| 'combined_score': combined_score, | |
| 'selected': False | |
| } | |
| def extract_frames_at_intervals(video_path: str, interval_seconds: float = 10.0) -> List[Dict]: | |
| """Extract frames at regular intervals from video""" | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Cannot open video file: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = total_frames / fps | |
| logger.info(f"πΉ Video: {duration:.1f}s, {fps:.1f} FPS, {total_frames} frames") | |
| segments = [] | |
| current_time = 0 | |
| while current_time < duration: | |
| segment_duration = min(interval_seconds, duration - current_time) | |
| segments.append({ | |
| 'start_time': current_time, | |
| 'duration': segment_duration, | |
| 'frame_number': int(current_time * fps) | |
| }) | |
| current_time += interval_seconds | |
| cap.release() | |
| return segments | |
| def save_frame_at_time(video_path: str, time_seconds: float, output_path: str) -> bool: | |
| """Save a frame at specific time with robust frame extraction""" | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return False | |
| try: | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| frame_number = int(time_seconds * fps) | |
| # Ensure frame number is within valid range | |
| frame_number = min(frame_number, total_frames - 1) | |
| frame_number = max(frame_number, 0) | |
| # Try to extract frame with fallback options | |
| for attempt in range(3): | |
| try: | |
| # Try exact frame first | |
| test_frame = frame_number + attempt | |
| if test_frame >= total_frames: | |
| test_frame = frame_number - attempt | |
| if test_frame < 0: | |
| test_frame = frame_number | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, test_frame) | |
| ret, frame = cap.read() | |
| if ret and frame is not None and frame.size > 0: | |
| # Validate frame data | |
| if len(frame.shape) == 3 and frame.shape[2] == 3: # Valid color frame | |
| success = cv2.imwrite(output_path, frame) | |
| if success: | |
| cap.release() | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Frame extraction attempt {attempt + 1} failed: {e}") | |
| continue | |
| cap.release() | |
| return False | |
| except Exception as e: | |
| logger.error(f"Critical error in frame extraction: {e}") | |
| cap.release() | |
| return False | |
| def create_highlights_video(video_path: str, selected_segments: List[Dict], output_path: str): | |
| """Create highlights video from selected segments""" | |
| if not selected_segments: | |
| logger.error("β No segments selected for highlights") | |
| return False | |
| # Create temporary files for each segment | |
| temp_files = [] | |
| temp_dir = tempfile.mkdtemp() | |
| for i, segment in enumerate(selected_segments): | |
| temp_file = os.path.join(temp_dir, f"segment_{i}.mp4") | |
| cmd = [ | |
| 'ffmpeg', '-i', video_path, | |
| '-ss', str(segment['start_time']), | |
| '-t', str(segment['duration']), | |
| '-c', 'copy', # Copy streams without re-encoding | |
| '-y', temp_file | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| temp_files.append(temp_file) | |
| logger.info(f"β Created segment {i+1}/{len(selected_segments)}") | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"β Failed to create segment {i+1}: {e}") | |
| continue | |
| if not temp_files: | |
| logger.error("β No valid segments created") | |
| return False | |
| # Create concat file | |
| concat_file = os.path.join(temp_dir, "concat.txt") | |
| with open(concat_file, 'w') as f: | |
| for temp_file in temp_files: | |
| f.write(f"file '{temp_file}'\n") | |
| # Concatenate segments | |
| cmd = [ | |
| 'ffmpeg', '-f', 'concat', '-safe', '0', | |
| '-i', concat_file, | |
| '-c', 'copy', | |
| '-y', output_path | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| logger.info(f"β Highlights video created: {output_path}") | |
| # Cleanup | |
| for temp_file in temp_files: | |
| try: | |
| os.unlink(temp_file) | |
| except: | |
| pass | |
| try: | |
| os.unlink(concat_file) | |
| os.rmdir(temp_dir) | |
| except: | |
| pass | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"β Failed to create highlights video: {e}") | |
| return False | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Audio-Enhanced Video Highlights Generator") | |
| parser.add_argument("video_path", help="Path to input video file") | |
| parser.add_argument("--output", "-o", default="audio_enhanced_highlights.mp4", | |
| help="Output highlights video path") | |
| parser.add_argument("--interval", "-i", type=float, default=10.0, | |
| help="Analysis interval in seconds (default: 10.0)") | |
| parser.add_argument("--min-score", "-s", type=float, default=7.0, | |
| help="Minimum score for highlights (default: 7.0)") | |
| parser.add_argument("--max-highlights", "-m", type=int, default=5, | |
| help="Maximum number of highlights (default: 5)") | |
| parser.add_argument("--whisper-model", "-w", default="base", | |
| choices=["tiny", "base", "small", "medium", "large"], | |
| help="Whisper model size (default: base)") | |
| parser.add_argument("--timeout", "-t", type=int, default=30, | |
| help="Timeout for each analysis in seconds (default: 30)") | |
| parser.add_argument("--save-analysis", action="store_true", | |
| help="Save detailed analysis to JSON file") | |
| args = parser.parse_args() | |
| # Validate input | |
| if not os.path.exists(args.video_path): | |
| print(f"β Video file not found: {args.video_path}") | |
| sys.exit(1) | |
| print("π¬ Audio-Enhanced Video Highlights Generator") | |
| print(f"π Input: {args.video_path}") | |
| print(f"π Output: {args.output}") | |
| print(f"β±οΈ Analysis interval: {args.interval}s") | |
| print(f"π― Minimum score: {args.min_score}") | |
| print(f"π Max highlights: {args.max_highlights}") | |
| print(f"ποΈ Whisper model: {args.whisper_model}") | |
| print() | |
| try: | |
| # Initialize analyzer | |
| analyzer = AudioVisualAnalyzer( | |
| whisper_model_size=args.whisper_model, | |
| timeout_seconds=args.timeout | |
| ) | |
| # Extract segments for analysis | |
| segments = extract_frames_at_intervals(args.video_path, args.interval) | |
| print(f"π Analyzing {len(segments)} segments...") | |
| analyzed_segments = [] | |
| temp_frame_path = "temp_frame.jpg" | |
| for i, segment in enumerate(segments): | |
| print(f"\nπ Segment {i+1}/{len(segments)} (t={segment['start_time']:.1f}s)") | |
| # Save frame for visual analysis | |
| if save_frame_at_time(args.video_path, segment['start_time'], temp_frame_path): | |
| # Analyze segment | |
| analysis = analyzer.analyze_segment(args.video_path, segment, temp_frame_path) | |
| analyzed_segments.append(analysis) | |
| print(f" ποΈ Visual: {analysis['visual_score']:.1f}/10") | |
| print(f" ποΈ Audio: '{analysis['audio_text'][:50]}...' ({analysis['audio_language']})") | |
| print(f" π― Combined: {analysis['combined_score']:.1f}/10") | |
| else: | |
| print(f" β Failed to extract frame") | |
| # Cleanup temp frame | |
| try: | |
| os.unlink(temp_frame_path) | |
| except: | |
| pass | |
| if not analyzed_segments: | |
| print("β No segments analyzed successfully") | |
| sys.exit(1) | |
| # Select best segments | |
| analyzed_segments.sort(key=lambda x: x['combined_score'], reverse=True) | |
| selected_segments = [s for s in analyzed_segments if s['combined_score'] >= args.min_score] | |
| selected_segments = selected_segments[:args.max_highlights] | |
| print(f"\nπ Selected {len(selected_segments)} highlights:") | |
| for i, segment in enumerate(selected_segments): | |
| print(f"{i+1}. t={segment['start_time']:.1f}s, score={segment['combined_score']:.1f}") | |
| if segment['audio_text']: | |
| print(f" Audio: \"{segment['audio_text'][:100]}...\"") | |
| if not selected_segments: | |
| print(f"β No segments met minimum score of {args.min_score}") | |
| sys.exit(1) | |
| # Create highlights video | |
| print(f"\n㪠Creating highlights video...") | |
| success = create_highlights_video(args.video_path, selected_segments, args.output) | |
| if success: | |
| print(f"β Audio-enhanced highlights created: {args.output}") | |
| # Save analysis if requested | |
| if args.save_analysis: | |
| analysis_file = args.output.replace('.mp4', '_analysis.json') | |
| with open(analysis_file, 'w') as f: | |
| json.dump({ | |
| 'input_video': args.video_path, | |
| 'output_video': args.output, | |
| 'settings': { | |
| 'interval': args.interval, | |
| 'min_score': args.min_score, | |
| 'max_highlights': args.max_highlights, | |
| 'whisper_model': args.whisper_model, | |
| 'timeout': args.timeout | |
| }, | |
| 'segments': analyzed_segments, | |
| 'selected_segments': selected_segments | |
| }, f, indent=2) | |
| print(f"π Analysis saved: {analysis_file}") | |
| else: | |
| print("β Failed to create highlights video") | |
| sys.exit(1) | |
| except KeyboardInterrupt: | |
| print("\nβΉοΈ Operation cancelled by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |