Spaces:
Sleeping
Sleeping
| """ | |
| Whisper-based Video Trimming and Frame Extraction | |
| Uses OpenAI's Whisper to detect last spoken word and find optimal transition frames | |
| """ | |
| import re | |
| from typing import List, Tuple, Optional | |
| import tempfile | |
| import os | |
| try: | |
| import whisper | |
| from moviepy.editor import VideoFileClip | |
| WHISPER_AVAILABLE = True | |
| USE_SYSTEM_WHISPER = False | |
| except ImportError: | |
| # Try to use system Python's Whisper | |
| import subprocess | |
| import sys | |
| SYSTEM_PYTHON = "/opt/anaconda3/bin/python" | |
| if os.path.exists(SYSTEM_PYTHON): | |
| try: | |
| # Test if system Python has whisper | |
| result = subprocess.run( | |
| [SYSTEM_PYTHON, "-c", "import whisper; print('OK')"], | |
| capture_output=True, | |
| timeout=5 | |
| ) | |
| if result.returncode == 0: | |
| WHISPER_AVAILABLE = True | |
| USE_SYSTEM_WHISPER = True | |
| else: | |
| WHISPER_AVAILABLE = False | |
| USE_SYSTEM_WHISPER = False | |
| except: | |
| WHISPER_AVAILABLE = False | |
| USE_SYSTEM_WHISPER = False | |
| else: | |
| WHISPER_AVAILABLE = False | |
| USE_SYSTEM_WHISPER = False | |
| if not WHISPER_AVAILABLE: | |
| print("β οΈ Whisper not available. Install with: pip install openai-whisper moviepy") | |
| def normalize_text(text: str) -> str: | |
| """Normalize text by removing punctuation and converting to lowercase""" | |
| return re.sub(r"[^\w\s]", "", text.lower().strip()) | |
| def transcribe_video( | |
| video_path: str, | |
| model_size: str = "base" | |
| ) -> Tuple[str, Optional[float]]: | |
| """ | |
| Transcribe video audio and find the last word timestamp. | |
| Args: | |
| video_path: Path to video file | |
| model_size: Whisper model size (tiny, base, small, medium, large) | |
| Returns: | |
| Tuple of (full_transcription, last_word_end_time) | |
| """ | |
| if not WHISPER_AVAILABLE: | |
| raise ImportError("Whisper not installed. Run: pip install openai-whisper moviepy") | |
| # Use system Python if needed | |
| if USE_SYSTEM_WHISPER: | |
| return _transcribe_video_system(video_path, model_size) | |
| print(f"π€ Loading Whisper model ({model_size})...") | |
| model = whisper.load_model(model_size) | |
| print("π€ Transcribing audio...") | |
| result = model.transcribe(video_path, word_timestamps=True) | |
| # Get full transcription text | |
| full_text = result.get("text", "").strip() | |
| # Get last word timestamp | |
| segments = result.get("segments", []) | |
| last_time = None | |
| if segments: | |
| # Find the end time of the last word | |
| for seg in reversed(segments): | |
| words = seg.get("words", []) | |
| if words: | |
| last_time = words[-1].get("end") | |
| break | |
| print(f"π Transcribed: \"{full_text[:100]}...\"" if len(full_text) > 100 else f"π Transcribed: \"{full_text}\"") | |
| if last_time: | |
| print(f"β Last word ends at {last_time:.2f} seconds") | |
| return full_text, last_time | |
| def _transcribe_video_system( | |
| video_path: str, | |
| model_size: str = "base" | |
| ) -> Tuple[str, Optional[float]]: | |
| """Transcribe video using system Python's Whisper""" | |
| import subprocess | |
| import json | |
| SYSTEM_PYTHON = "/opt/anaconda3/bin/python" | |
| print(f"π€ Using system Whisper (model: {model_size})...") | |
| # Create a temporary Python script file to avoid shell escaping issues | |
| script_file = tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) | |
| try: | |
| script_file.write(f""" | |
| import whisper | |
| import json | |
| import sys | |
| video_path = {json.dumps(video_path)} | |
| model_size = {json.dumps(model_size)} | |
| model = whisper.load_model(model_size) | |
| result = model.transcribe(video_path, word_timestamps=True) | |
| # Extract transcription and last word timestamp | |
| full_text = result.get("text", "").strip() | |
| segments = result.get("segments", []) | |
| last_time = None | |
| if segments: | |
| for seg in reversed(segments): | |
| words = seg.get("words", []) | |
| if words: | |
| last_time = words[-1].get("end") | |
| break | |
| output = {{ | |
| "text": full_text, | |
| "last_time": last_time | |
| }} | |
| print(json.dumps(output)) | |
| """) | |
| script_file.close() | |
| result = subprocess.run( | |
| [SYSTEM_PYTHON, script_file.name], | |
| capture_output=True, | |
| text=True, | |
| timeout=300 # 5 minute timeout | |
| ) | |
| if result.returncode != 0: | |
| raise Exception(f"Whisper transcription failed: {result.stderr}") | |
| output = json.loads(result.stdout.strip()) | |
| full_text = output.get("text", "").strip() | |
| last_time = output.get("last_time") | |
| print(f"π Transcribed: \"{full_text[:100]}...\"" if len(full_text) > 100 else f"π Transcribed: \"{full_text}\"") | |
| if last_time: | |
| print(f"β Last word ends at {last_time:.2f} seconds") | |
| return full_text, last_time | |
| except subprocess.TimeoutExpired: | |
| raise Exception("Whisper transcription timed out") | |
| except json.JSONDecodeError as e: | |
| raise Exception(f"Failed to parse Whisper output: {str(e)}") | |
| except Exception as e: | |
| raise Exception(f"System Whisper error: {str(e)}") | |
| finally: | |
| # Clean up script file | |
| try: | |
| if os.path.exists(script_file.name): | |
| os.remove(script_file.name) | |
| except: | |
| pass | |
| def find_last_word_timestamp( | |
| video_path: str, | |
| script: str, | |
| model_size: str = "base" | |
| ) -> Optional[float]: | |
| """ | |
| Find the timestamp of the last spoken word in the script | |
| Args: | |
| video_path: Path to video file | |
| script: Expected script/dialogue | |
| model_size: Whisper model size (tiny, base, small, medium, large) | |
| Returns: | |
| Timestamp (seconds) of last word, or None if not found | |
| """ | |
| if not WHISPER_AVAILABLE: | |
| raise ImportError("Whisper not installed. Run: pip install openai-whisper moviepy") | |
| # Use system Python if needed | |
| if USE_SYSTEM_WHISPER: | |
| return _find_last_word_timestamp_system(video_path, script, model_size) | |
| print(f"π€ Loading Whisper model ({model_size})...") | |
| model = whisper.load_model(model_size) | |
| print("π€ Transcribing audio...") | |
| result = model.transcribe(video_path, word_timestamps=True) | |
| segments = result.get("segments", []) | |
| if not segments: | |
| raise ValueError("No speech segments detected in video") | |
| # Find last word in script | |
| script_clean = normalize_text(script) | |
| script_words = script_clean.split() | |
| if not script_words: | |
| raise ValueError("Script is empty") | |
| last_word = script_words[-1] | |
| print(f"π Looking for last word: '{last_word}'") | |
| # Search for last occurrence of that word in transcription | |
| last_time = None | |
| for seg in segments: | |
| for word_info in seg.get("words", []): | |
| word_text = normalize_text(word_info["word"]) | |
| if word_text == last_word: | |
| last_time = word_info["end"] | |
| if last_time is None: | |
| # Fallback: try to find any word from the end of script | |
| print(f"β οΈ Last word '{last_word}' not found, trying other words...") | |
| for i in range(min(5, len(script_words))): # Try last 5 words | |
| word_to_find = script_words[-(i+1)] | |
| for seg in segments: | |
| for word_info in seg.get("words", []): | |
| word_text = normalize_text(word_info["word"]) | |
| if word_text == word_to_find: | |
| last_time = word_info["end"] | |
| print(f"β Found '{word_to_find}' at {last_time:.2f}s instead") | |
| break | |
| if last_time: | |
| break | |
| if last_time: | |
| break | |
| if last_time: | |
| print(f"β Last spoken word ends at {last_time:.2f} seconds") | |
| return last_time | |
| def extract_post_speech_frames( | |
| video_path: str, | |
| script: str, | |
| buffer_time: float = 0.3, | |
| num_frames: int = 3, | |
| model_size: str = "base" | |
| ) -> List[Tuple[float, str]]: | |
| """ | |
| Extract frames from the post-speech zone (after last spoken word) | |
| Args: | |
| video_path: Path to video file | |
| script: Expected script/dialogue | |
| buffer_time: Time after last word to start extracting (seconds) | |
| num_frames: Number of frames to extract | |
| model_size: Whisper model size | |
| Returns: | |
| List of (timestamp, base64_data_url) tuples | |
| """ | |
| if not WHISPER_AVAILABLE: | |
| raise ImportError("Whisper not installed. Run: pip install openai-whisper moviepy") | |
| # Find last word timestamp | |
| last_word_time = find_last_word_timestamp(video_path, script, model_size) | |
| if last_word_time is None: | |
| raise ValueError("Could not find last spoken word in video") | |
| # Get video duration | |
| clip = VideoFileClip(video_path) | |
| duration = clip.duration | |
| clip.close() | |
| # Calculate post-speech zone | |
| post_speech_start = min(last_word_time + buffer_time, duration - 0.5) | |
| post_speech_end = duration | |
| print(f"π Post-speech zone: {post_speech_start:.2f}s to {post_speech_end:.2f}s") | |
| # Calculate frame timestamps | |
| available_time = post_speech_end - post_speech_start | |
| if available_time < 0.1: | |
| # Very little time, just use the end | |
| timestamps = [duration - 0.1] | |
| else: | |
| # Distribute frames evenly in post-speech zone | |
| if num_frames == 1: | |
| timestamps = [post_speech_end - 0.1] | |
| else: | |
| step = available_time / (num_frames - 1) | |
| timestamps = [post_speech_start + (i * step) for i in range(num_frames)] | |
| # Extract frames | |
| from utils.video_processor import extract_frame | |
| frames = [] | |
| for i, timestamp in enumerate(timestamps): | |
| print(f"πΈ Extracting frame at {timestamp:.2f}s...") | |
| frame_data = extract_frame(video_path, timestamp, return_base64=True) | |
| # Create label based on position | |
| if i == 0 and len(timestamps) > 1: | |
| label = "Right After Speech" | |
| elif i == len(timestamps) - 1: | |
| label = "Final Frame" | |
| else: | |
| label = f"Frame {i+1}" | |
| frames.append((timestamp, frame_data, label)) | |
| return frames | |
| def trim_video_to_last_word( | |
| video_path: str, | |
| script: str, | |
| output_path: str, | |
| padding: float = 0.5, | |
| model_size: str = "base" | |
| ) -> str: | |
| """ | |
| Trim video to end shortly after the last spoken word | |
| Args: | |
| video_path: Input video path | |
| script: Expected script/dialogue | |
| output_path: Output video path | |
| padding: Time to keep after last word (seconds) | |
| model_size: Whisper model size | |
| Returns: | |
| Path to trimmed video | |
| """ | |
| if not WHISPER_AVAILABLE: | |
| raise ImportError("Whisper not installed. Run: pip install openai-whisper moviepy") | |
| # Find last word timestamp | |
| last_word_time = find_last_word_timestamp(video_path, script, model_size) | |
| if last_word_time is None: | |
| raise ValueError("Could not find last spoken word in video") | |
| # Calculate trim point | |
| trim_time = last_word_time + padding | |
| print(f"βοΈ Trimming video to {trim_time:.2f} seconds...") | |
| # Use FFmpeg for trimming (more reliable than moviepy, especially with system Python) | |
| import subprocess | |
| # FFmpeg command to trim video | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", video_path, | |
| "-t", str(trim_time), # Duration to keep | |
| "-c", "copy", # Copy codecs (fast, no re-encoding) | |
| "-avoid_negative_ts", "make_zero", | |
| "-y", # Overwrite output file | |
| output_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| # If copy codec fails, try re-encoding | |
| print("β οΈ Copy codec failed, re-encoding...") | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", video_path, | |
| "-t", str(trim_time), | |
| "-c:v", "libx264", | |
| "-c:a", "aac", | |
| "-y", | |
| output_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise Exception(f"FFmpeg trimming failed: {result.stderr}") | |
| print(f"β Trimmed video saved to: {output_path}") | |
| return output_path | |
| def _find_last_word_timestamp_system( | |
| video_path: str, | |
| script: str, | |
| model_size: str = "base" | |
| ) -> Optional[float]: | |
| """Find last word timestamp using system Python""" | |
| import subprocess | |
| import json | |
| import tempfile | |
| SYSTEM_PYTHON = "/opt/anaconda3/bin/python" | |
| print(f"π€ Using system Whisper (model: {model_size})...") | |
| # Create temp file for JSON output | |
| temp_json = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) | |
| temp_json.close() | |
| try: | |
| # Run whisper via system Python | |
| cmd = [ | |
| SYSTEM_PYTHON, "-m", "whisper", | |
| video_path, | |
| "--model", model_size, | |
| "--output_format", "json", | |
| "--output_dir", os.path.dirname(temp_json.name), | |
| "--word_timestamps", "True" | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) | |
| if result.returncode != 0: | |
| raise Exception(f"Whisper transcription failed: {result.stderr}") | |
| # Find JSON file | |
| base_name = os.path.splitext(os.path.basename(video_path))[0] | |
| json_path = os.path.join(os.path.dirname(temp_json.name), f"{base_name}.json") | |
| if not os.path.exists(json_path): | |
| raise Exception(f"JSON output not found: {json_path}") | |
| with open(json_path, 'r') as f: | |
| transcription_data = json.load(f) | |
| # Find last word | |
| script_clean = normalize_text(script) | |
| script_words = script_clean.split() | |
| if not script_words: | |
| return None | |
| last_word = script_words[-1] | |
| segments = transcription_data.get("segments", []) | |
| last_time = None | |
| for seg in segments: | |
| for word_info in seg.get("words", []): | |
| word_text = normalize_text(word_info.get("word", "")) | |
| if word_text == last_word: | |
| last_time = word_info.get("end", 0) | |
| # Cleanup | |
| try: | |
| os.remove(json_path) | |
| except: | |
| pass | |
| return last_time | |
| finally: | |
| try: | |
| os.remove(temp_json.name) | |
| except: | |
| pass | |
| def is_whisper_available() -> bool: | |
| """Check if Whisper is installed and available""" | |
| return WHISPER_AVAILABLE | |