Spaces:
Sleeping
Sleeping
| """ | |
| Whisper-based Video Analysis Service | |
| Optimized endpoint that finds trim point and extracts frame in one call | |
| """ | |
| from fastapi import APIRouter, HTTPException | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| import tempfile | |
| import os | |
| import httpx | |
| router = APIRouter() | |
| # Check Whisper availability | |
| try: | |
| from utils.whisper_trim import find_last_word_timestamp, transcribe_video, is_whisper_available | |
| from utils.video_processor import extract_frame, get_video_info | |
| WHISPER_AVAILABLE = is_whisper_available() | |
| except ImportError: | |
| WHISPER_AVAILABLE = False | |
| class WhisperAnalyzeRequest(BaseModel): | |
| video_url: str | |
| dialogue: str # The expected dialogue/script for this segment | |
| buffer_time: float = 0.3 # Time after last word for frame extraction | |
| model_size: str = "base" # Whisper model size | |
| class WhisperAnalyzeResponse(BaseModel): | |
| success: bool | |
| last_word_timestamp: Optional[float] = None # When last word ends | |
| trim_point: Optional[float] = None # Recommended trim point (last_word + buffer) | |
| frame_timestamp: Optional[float] = None # Where frame was extracted | |
| frame_base64: Optional[str] = None # Base64 encoded frame image | |
| video_duration: float = 0 # Total video duration | |
| transcribed_text: Optional[str] = None # What Whisper actually heard (for consistency check) | |
| error: Optional[str] = None | |
| async def analyze_and_extract_frame(request: WhisperAnalyzeRequest): | |
| """ | |
| Analyze video with Whisper to find last spoken word, | |
| then extract frame at that point for visual continuity. | |
| This is the optimized flow: | |
| 1. Download video | |
| 2. Use Whisper to find last spoken word timestamp | |
| 3. Extract frame at (last_word_time + buffer) | |
| 4. Return frame + trim metadata | |
| The trim metadata can be used later during final merge. | |
| """ | |
| temp_video = None | |
| try: | |
| # Download video to temp file | |
| print(f"π€ Downloading video for Whisper analysis...") | |
| temp_video = tempfile.mktemp(suffix='.mp4') | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| response = await client.get(request.video_url) | |
| if response.status_code != 200: | |
| return WhisperAnalyzeResponse( | |
| success=False, | |
| error=f"Failed to download video: {response.status_code}" | |
| ) | |
| with open(temp_video, 'wb') as f: | |
| f.write(response.content) | |
| # Get video duration | |
| video_info = get_video_info(temp_video) | |
| video_duration = float(video_info['format']['duration']) | |
| print(f"πΉ Video duration: {video_duration:.2f}s") | |
| # Try Whisper-based analysis | |
| last_word_time = None | |
| frame_base64 = None | |
| trim_point = None | |
| frame_timestamp = None | |
| transcribed_text = None | |
| if WHISPER_AVAILABLE: | |
| try: | |
| print(f"π€ Running Whisper transcription (model: {request.model_size})...") | |
| # Get full transcription and last word timestamp | |
| transcribed_text, last_word_time = transcribe_video( | |
| video_path=temp_video, | |
| model_size=request.model_size | |
| ) | |
| if last_word_time and last_word_time > 0: | |
| print(f"β Last spoken word at: {last_word_time:.2f}s") | |
| # Calculate trim point and frame timestamp | |
| trim_point = min(last_word_time + request.buffer_time, video_duration) | |
| frame_timestamp = min(last_word_time + request.buffer_time, video_duration - 0.1) | |
| print(f"π Trim point: {trim_point:.2f}s, Frame at: {frame_timestamp:.2f}s") | |
| else: | |
| print(f"β οΈ Could not find last word, using fallback") | |
| except Exception as whisper_err: | |
| print(f"β οΈ Whisper analysis failed: {str(whisper_err)}") | |
| else: | |
| print("β οΈ Whisper not available, using fallback") | |
| # Fallback: use end of video | |
| if frame_timestamp is None: | |
| frame_timestamp = max(0, video_duration - 0.5) | |
| trim_point = video_duration | |
| print(f"π Fallback: Frame at {frame_timestamp:.2f}s (near end)") | |
| # Extract frame at the calculated timestamp (uncompressed for continuity) | |
| print(f"πΈ Extracting frame at {frame_timestamp:.2f}s") | |
| frame_base64 = extract_frame( | |
| video_path=temp_video, | |
| timestamp=frame_timestamp, | |
| return_base64=True, | |
| compress=False # No compression for continuity frames | |
| ) | |
| print(f"β Frame extracted successfully") | |
| return WhisperAnalyzeResponse( | |
| success=True, | |
| last_word_timestamp=last_word_time, | |
| trim_point=trim_point, | |
| frame_timestamp=frame_timestamp, | |
| frame_base64=frame_base64, | |
| video_duration=video_duration, | |
| transcribed_text=transcribed_text, | |
| error=None | |
| ) | |
| except Exception as e: | |
| print(f"β Whisper analyze error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return WhisperAnalyzeResponse( | |
| success=False, | |
| error=str(e) | |
| ) | |
| finally: | |
| # Clean up temp file | |
| if temp_video and os.path.exists(temp_video): | |
| try: | |
| os.remove(temp_video) | |
| except: | |
| pass | |
| async def whisper_status(): | |
| """Check if Whisper is available and ready""" | |
| return { | |
| "available": WHISPER_AVAILABLE, | |
| "message": "Whisper is ready" if WHISPER_AVAILABLE | |
| else "Whisper not installed. Run: pip install openai-whisper moviepy" | |
| } | |