Spaces:
Sleeping
Sleeping
| """ | |
| Frame Extraction API endpoints | |
| Intelligent frame selection using Whisper | |
| """ | |
| from fastapi import APIRouter, HTTPException, UploadFile, File, Form | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import tempfile | |
| import os | |
| from utils.whisper_trim import ( | |
| extract_post_speech_frames, | |
| find_last_word_timestamp, | |
| trim_video_to_last_word, | |
| is_whisper_available | |
| ) | |
| router = APIRouter() | |
| class FrameExtractionRequest(BaseModel): | |
| video_url: str | |
| script: str | |
| buffer_time: Optional[float] = 0.3 | |
| num_frames: Optional[int] = 3 | |
| model_size: Optional[str] = "base" | |
| class FrameExtractionResponse(BaseModel): | |
| frames: List[dict] # [{timestamp, frame_data_url, label}] | |
| last_word_time: float | |
| total_duration: float | |
| async def extract_frames_api(request: FrameExtractionRequest): | |
| """ | |
| Extract transition frames using Whisper to detect last spoken word | |
| """ | |
| if not is_whisper_available(): | |
| raise HTTPException( | |
| status_code=501, | |
| detail="Whisper not installed. Install with: pip install openai-whisper moviepy" | |
| ) | |
| try: | |
| # Download video temporarily | |
| import httpx | |
| temp_video = tempfile.mktemp(suffix='.mp4') | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(request.video_url) | |
| if response.status_code != 200: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Failed to download video: {response.status_code}" | |
| ) | |
| with open(temp_video, 'wb') as f: | |
| f.write(response.content) | |
| frames = [] | |
| last_word_time = None | |
| try: | |
| # Prefer Whisper-based post-speech detection | |
| frames = extract_post_speech_frames( | |
| temp_video, | |
| request.script, | |
| buffer_time=request.buffer_time, | |
| num_frames=request.num_frames, | |
| model_size=request.model_size | |
| ) | |
| # Get last word timestamp | |
| last_word_time = find_last_word_timestamp( | |
| temp_video, | |
| request.script, | |
| model_size=request.model_size | |
| ) | |
| except Exception as whisper_err: | |
| # Fallback: simple fixed timestamps near the end of the video | |
| print(f"⚠️ Whisper-based frame extraction failed: {whisper_err}") | |
| try: | |
| from moviepy.editor import VideoFileClip | |
| from utils.video_processor import extract_frame | |
| clip = VideoFileClip(temp_video) | |
| duration = clip.duration | |
| clip.close() | |
| fallback_timestamps = [ | |
| max(0, duration - 1.5), | |
| max(0, duration - 1.0), | |
| max(0, duration - 0.5), | |
| ] | |
| labels = ["Early End", "Mid End", "Final Frame"] | |
| for ts, label in zip(fallback_timestamps, labels): | |
| frame_data = extract_frame(temp_video, ts, return_base64=True) | |
| frames.append((ts, frame_data, label)) | |
| last_word_time = fallback_timestamps[-1] if fallback_timestamps else None | |
| print("✅ Returned fallback frames near video end.") | |
| except Exception as fallback_err: | |
| print(f"❌ Fallback frame extraction failed: {fallback_err}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Frame extraction failed: {str(whisper_err)}" | |
| ) | |
| # Get video duration | |
| from moviepy.editor import VideoFileClip | |
| clip = VideoFileClip(temp_video) | |
| duration = clip.duration | |
| clip.close() | |
| # Clean up | |
| os.remove(temp_video) | |
| # Format response | |
| frames_data = [ | |
| { | |
| "timestamp": timestamp, | |
| "frame_data_url": frame_data, | |
| "label": label | |
| } | |
| for timestamp, frame_data, label in frames | |
| ] | |
| return FrameExtractionResponse( | |
| frames=frames_data, | |
| last_word_time=last_word_time, | |
| total_duration=duration | |
| ) | |
| except Exception as e: | |
| # Clean up temp file if it exists | |
| if 'temp_video' in locals() and os.path.exists(temp_video): | |
| os.remove(temp_video) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Frame extraction failed: {str(e)}" | |
| ) | |
| async def trim_video_api( | |
| video_url: str = Form(...), | |
| script: str = Form(...), | |
| padding: float = Form(0.5), | |
| model_size: str = Form("base") | |
| ): | |
| """ | |
| Trim video to end after last spoken word | |
| """ | |
| if not is_whisper_available(): | |
| raise HTTPException( | |
| status_code=501, | |
| detail="Whisper not installed. Install with: pip install openai-whisper moviepy" | |
| ) | |
| try: | |
| # Download video temporarily | |
| import httpx | |
| temp_video = tempfile.mktemp(suffix='.mp4') | |
| output_video = tempfile.mktemp(suffix='_trimmed.mp4') | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(video_url) | |
| if response.status_code != 200: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Failed to download video: {response.status_code}" | |
| ) | |
| with open(temp_video, 'wb') as f: | |
| f.write(response.content) | |
| # Trim video | |
| output_path = trim_video_to_last_word( | |
| temp_video, | |
| script, | |
| output_video, | |
| padding=padding, | |
| model_size=model_size | |
| ) | |
| # Read trimmed video | |
| with open(output_path, 'rb') as f: | |
| video_data = f.read() | |
| # Clean up | |
| os.remove(temp_video) | |
| os.remove(output_video) | |
| # Return trimmed video | |
| from fastapi.responses import Response | |
| return Response( | |
| content=video_data, | |
| media_type="video/mp4", | |
| headers={ | |
| "Content-Disposition": "attachment; filename=trimmed_video.mp4" | |
| } | |
| ) | |
| except Exception as e: | |
| # Clean up temp files if they exist | |
| for temp_file in ['temp_video', 'output_video']: | |
| if temp_file in locals() and os.path.exists(locals()[temp_file]): | |
| os.remove(locals()[temp_file]) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Video trimming failed: {str(e)}" | |
| ) | |
| async def whisper_status(): | |
| """ | |
| Check if Whisper is available | |
| """ | |
| return { | |
| "available": is_whisper_available(), | |
| "message": "Whisper is available" if is_whisper_available() | |
| else "Install with: pip install openai-whisper moviepy" | |
| } | |