""" Whisper-based Video Trimming and Frame Extraction Uses OpenAI's Whisper to detect last spoken word and find optimal transition frames """ import re from typing import List, Tuple, Optional import tempfile import os try: import whisper from moviepy.editor import VideoFileClip WHISPER_AVAILABLE = True USE_SYSTEM_WHISPER = False except ImportError: # Try to use system Python's Whisper import subprocess import sys SYSTEM_PYTHON = "/opt/anaconda3/bin/python" if os.path.exists(SYSTEM_PYTHON): try: # Test if system Python has whisper result = subprocess.run( [SYSTEM_PYTHON, "-c", "import whisper; print('OK')"], capture_output=True, timeout=5 ) if result.returncode == 0: WHISPER_AVAILABLE = True USE_SYSTEM_WHISPER = True else: WHISPER_AVAILABLE = False USE_SYSTEM_WHISPER = False except: WHISPER_AVAILABLE = False USE_SYSTEM_WHISPER = False else: WHISPER_AVAILABLE = False USE_SYSTEM_WHISPER = False if not WHISPER_AVAILABLE: print("⚠️ Whisper not available. Install with: pip install openai-whisper moviepy") def normalize_text(text: str) -> str: """Normalize text by removing punctuation and converting to lowercase""" return re.sub(r"[^\w\s]", "", text.lower().strip()) def transcribe_video( video_path: str, model_size: str = "base" ) -> Tuple[str, Optional[float]]: """ Transcribe video audio and find the last word timestamp. Args: video_path: Path to video file model_size: Whisper model size (tiny, base, small, medium, large) Returns: Tuple of (full_transcription, last_word_end_time) """ if not WHISPER_AVAILABLE: raise ImportError("Whisper not installed. Run: pip install openai-whisper moviepy") # Use system Python if needed if USE_SYSTEM_WHISPER: return _transcribe_video_system(video_path, model_size) print(f"🎤 Loading Whisper model ({model_size})...") model = whisper.load_model(model_size) print("🎤 Transcribing audio...") result = model.transcribe(video_path, word_timestamps=True) # Get full transcription text full_text = result.get("text", "").strip() # Get last word timestamp segments = result.get("segments", []) last_time = None if segments: # Find the end time of the last word for seg in reversed(segments): words = seg.get("words", []) if words: last_time = words[-1].get("end") break print(f"📝 Transcribed: \"{full_text[:100]}...\"" if len(full_text) > 100 else f"📝 Transcribed: \"{full_text}\"") if last_time: print(f"✅ Last word ends at {last_time:.2f} seconds") return full_text, last_time def _transcribe_video_system( video_path: str, model_size: str = "base" ) -> Tuple[str, Optional[float]]: """Transcribe video using system Python's Whisper""" import subprocess import json SYSTEM_PYTHON = "/opt/anaconda3/bin/python" print(f"🎤 Using system Whisper (model: {model_size})...") # Create a temporary Python script file to avoid shell escaping issues script_file = tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) try: script_file.write(f""" import whisper import json import sys video_path = {json.dumps(video_path)} model_size = {json.dumps(model_size)} model = whisper.load_model(model_size) result = model.transcribe(video_path, word_timestamps=True) # Extract transcription and last word timestamp full_text = result.get("text", "").strip() segments = result.get("segments", []) last_time = None if segments: for seg in reversed(segments): words = seg.get("words", []) if words: last_time = words[-1].get("end") break output = {{ "text": full_text, "last_time": last_time }} print(json.dumps(output)) """) script_file.close() result = subprocess.run( [SYSTEM_PYTHON, script_file.name], capture_output=True, text=True, timeout=300 # 5 minute timeout ) if result.returncode != 0: raise Exception(f"Whisper transcription failed: {result.stderr}") output = json.loads(result.stdout.strip()) full_text = output.get("text", "").strip() last_time = output.get("last_time") print(f"📝 Transcribed: \"{full_text[:100]}...\"" if len(full_text) > 100 else f"📝 Transcribed: \"{full_text}\"") if last_time: print(f"✅ Last word ends at {last_time:.2f} seconds") return full_text, last_time except subprocess.TimeoutExpired: raise Exception("Whisper transcription timed out") except json.JSONDecodeError as e: raise Exception(f"Failed to parse Whisper output: {str(e)}") except Exception as e: raise Exception(f"System Whisper error: {str(e)}") finally: # Clean up script file try: if os.path.exists(script_file.name): os.remove(script_file.name) except: pass def find_last_word_timestamp( video_path: str, script: str, model_size: str = "base" ) -> Optional[float]: """ Find the timestamp of the last spoken word in the script Args: video_path: Path to video file script: Expected script/dialogue model_size: Whisper model size (tiny, base, small, medium, large) Returns: Timestamp (seconds) of last word, or None if not found """ if not WHISPER_AVAILABLE: raise ImportError("Whisper not installed. Run: pip install openai-whisper moviepy") # Use system Python if needed if USE_SYSTEM_WHISPER: return _find_last_word_timestamp_system(video_path, script, model_size) print(f"🎤 Loading Whisper model ({model_size})...") model = whisper.load_model(model_size) print("🎤 Transcribing audio...") result = model.transcribe(video_path, word_timestamps=True) segments = result.get("segments", []) if not segments: raise ValueError("No speech segments detected in video") # Find last word in script script_clean = normalize_text(script) script_words = script_clean.split() if not script_words: raise ValueError("Script is empty") last_word = script_words[-1] print(f"🔍 Looking for last word: '{last_word}'") # Search for last occurrence of that word in transcription last_time = None for seg in segments: for word_info in seg.get("words", []): word_text = normalize_text(word_info["word"]) if word_text == last_word: last_time = word_info["end"] if last_time is None: # Fallback: try to find any word from the end of script print(f"⚠️ Last word '{last_word}' not found, trying other words...") for i in range(min(5, len(script_words))): # Try last 5 words word_to_find = script_words[-(i+1)] for seg in segments: for word_info in seg.get("words", []): word_text = normalize_text(word_info["word"]) if word_text == word_to_find: last_time = word_info["end"] print(f"✅ Found '{word_to_find}' at {last_time:.2f}s instead") break if last_time: break if last_time: break if last_time: print(f"✅ Last spoken word ends at {last_time:.2f} seconds") return last_time def extract_post_speech_frames( video_path: str, script: str, buffer_time: float = 0.3, num_frames: int = 3, model_size: str = "base" ) -> List[Tuple[float, str]]: """ Extract frames from the post-speech zone (after last spoken word) Args: video_path: Path to video file script: Expected script/dialogue buffer_time: Time after last word to start extracting (seconds) num_frames: Number of frames to extract model_size: Whisper model size Returns: List of (timestamp, base64_data_url) tuples """ if not WHISPER_AVAILABLE: raise ImportError("Whisper not installed. Run: pip install openai-whisper moviepy") # Find last word timestamp last_word_time = find_last_word_timestamp(video_path, script, model_size) if last_word_time is None: raise ValueError("Could not find last spoken word in video") # Get video duration clip = VideoFileClip(video_path) duration = clip.duration clip.close() # Calculate post-speech zone post_speech_start = min(last_word_time + buffer_time, duration - 0.5) post_speech_end = duration print(f"📍 Post-speech zone: {post_speech_start:.2f}s to {post_speech_end:.2f}s") # Calculate frame timestamps available_time = post_speech_end - post_speech_start if available_time < 0.1: # Very little time, just use the end timestamps = [duration - 0.1] else: # Distribute frames evenly in post-speech zone if num_frames == 1: timestamps = [post_speech_end - 0.1] else: step = available_time / (num_frames - 1) timestamps = [post_speech_start + (i * step) for i in range(num_frames)] # Extract frames from utils.video_processor import extract_frame frames = [] for i, timestamp in enumerate(timestamps): print(f"📸 Extracting frame at {timestamp:.2f}s...") frame_data = extract_frame(video_path, timestamp, return_base64=True) # Create label based on position if i == 0 and len(timestamps) > 1: label = "Right After Speech" elif i == len(timestamps) - 1: label = "Final Frame" else: label = f"Frame {i+1}" frames.append((timestamp, frame_data, label)) return frames def trim_video_to_last_word( video_path: str, script: str, output_path: str, padding: float = 0.5, model_size: str = "base" ) -> str: """ Trim video to end shortly after the last spoken word Args: video_path: Input video path script: Expected script/dialogue output_path: Output video path padding: Time to keep after last word (seconds) model_size: Whisper model size Returns: Path to trimmed video """ if not WHISPER_AVAILABLE: raise ImportError("Whisper not installed. Run: pip install openai-whisper moviepy") # Find last word timestamp last_word_time = find_last_word_timestamp(video_path, script, model_size) if last_word_time is None: raise ValueError("Could not find last spoken word in video") # Calculate trim point trim_time = last_word_time + padding print(f"✂️ Trimming video to {trim_time:.2f} seconds...") # Use FFmpeg for trimming (more reliable than moviepy, especially with system Python) import subprocess # FFmpeg command to trim video cmd = [ "ffmpeg", "-i", video_path, "-t", str(trim_time), # Duration to keep "-c", "copy", # Copy codecs (fast, no re-encoding) "-avoid_negative_ts", "make_zero", "-y", # Overwrite output file output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: # If copy codec fails, try re-encoding print("⚠️ Copy codec failed, re-encoding...") cmd = [ "ffmpeg", "-i", video_path, "-t", str(trim_time), "-c:v", "libx264", "-c:a", "aac", "-y", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise Exception(f"FFmpeg trimming failed: {result.stderr}") print(f"✅ Trimmed video saved to: {output_path}") return output_path def _find_last_word_timestamp_system( video_path: str, script: str, model_size: str = "base" ) -> Optional[float]: """Find last word timestamp using system Python""" import subprocess import json import tempfile SYSTEM_PYTHON = "/opt/anaconda3/bin/python" print(f"🎤 Using system Whisper (model: {model_size})...") # Create temp file for JSON output temp_json = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) temp_json.close() try: # Run whisper via system Python cmd = [ SYSTEM_PYTHON, "-m", "whisper", video_path, "--model", model_size, "--output_format", "json", "--output_dir", os.path.dirname(temp_json.name), "--word_timestamps", "True" ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode != 0: raise Exception(f"Whisper transcription failed: {result.stderr}") # Find JSON file base_name = os.path.splitext(os.path.basename(video_path))[0] json_path = os.path.join(os.path.dirname(temp_json.name), f"{base_name}.json") if not os.path.exists(json_path): raise Exception(f"JSON output not found: {json_path}") with open(json_path, 'r') as f: transcription_data = json.load(f) # Find last word script_clean = normalize_text(script) script_words = script_clean.split() if not script_words: return None last_word = script_words[-1] segments = transcription_data.get("segments", []) last_time = None for seg in segments: for word_info in seg.get("words", []): word_text = normalize_text(word_info.get("word", "")) if word_text == last_word: last_time = word_info.get("end", 0) # Cleanup try: os.remove(json_path) except: pass return last_time finally: try: os.remove(temp_json.name) except: pass def is_whisper_available() -> bool: """Check if Whisper is installed and available""" return WHISPER_AVAILABLE