Video_AdGenesis_App / api /frame_extraction.py
sushilideaclan01's picture
first push
91d209c
"""
Frame Extraction API endpoints
Intelligent frame selection using Whisper
"""
from fastapi import APIRouter, HTTPException, UploadFile, File, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List, Optional
import tempfile
import os
from utils.whisper_trim import (
extract_post_speech_frames,
find_last_word_timestamp,
trim_video_to_last_word,
is_whisper_available
)
router = APIRouter()
class FrameExtractionRequest(BaseModel):
video_url: str
script: str
buffer_time: Optional[float] = 0.3
num_frames: Optional[int] = 3
model_size: Optional[str] = "base"
class FrameExtractionResponse(BaseModel):
frames: List[dict] # [{timestamp, frame_data_url, label}]
last_word_time: float
total_duration: float
@router.post("/extract-frames", response_model=FrameExtractionResponse)
async def extract_frames_api(request: FrameExtractionRequest):
"""
Extract transition frames using Whisper to detect last spoken word
"""
if not is_whisper_available():
raise HTTPException(
status_code=501,
detail="Whisper not installed. Install with: pip install openai-whisper moviepy"
)
try:
# Download video temporarily
import httpx
temp_video = tempfile.mktemp(suffix='.mp4')
async with httpx.AsyncClient() as client:
response = await client.get(request.video_url)
if response.status_code != 200:
raise HTTPException(
status_code=400,
detail=f"Failed to download video: {response.status_code}"
)
with open(temp_video, 'wb') as f:
f.write(response.content)
frames = []
last_word_time = None
try:
# Prefer Whisper-based post-speech detection
frames = extract_post_speech_frames(
temp_video,
request.script,
buffer_time=request.buffer_time,
num_frames=request.num_frames,
model_size=request.model_size
)
# Get last word timestamp
last_word_time = find_last_word_timestamp(
temp_video,
request.script,
model_size=request.model_size
)
except Exception as whisper_err:
# Fallback: simple fixed timestamps near the end of the video
print(f"⚠️ Whisper-based frame extraction failed: {whisper_err}")
try:
from moviepy.editor import VideoFileClip
from utils.video_processor import extract_frame
clip = VideoFileClip(temp_video)
duration = clip.duration
clip.close()
fallback_timestamps = [
max(0, duration - 1.5),
max(0, duration - 1.0),
max(0, duration - 0.5),
]
labels = ["Early End", "Mid End", "Final Frame"]
for ts, label in zip(fallback_timestamps, labels):
frame_data = extract_frame(temp_video, ts, return_base64=True)
frames.append((ts, frame_data, label))
last_word_time = fallback_timestamps[-1] if fallback_timestamps else None
print("✅ Returned fallback frames near video end.")
except Exception as fallback_err:
print(f"❌ Fallback frame extraction failed: {fallback_err}")
raise HTTPException(
status_code=500,
detail=f"Frame extraction failed: {str(whisper_err)}"
)
# Get video duration
from moviepy.editor import VideoFileClip
clip = VideoFileClip(temp_video)
duration = clip.duration
clip.close()
# Clean up
os.remove(temp_video)
# Format response
frames_data = [
{
"timestamp": timestamp,
"frame_data_url": frame_data,
"label": label
}
for timestamp, frame_data, label in frames
]
return FrameExtractionResponse(
frames=frames_data,
last_word_time=last_word_time,
total_duration=duration
)
except Exception as e:
# Clean up temp file if it exists
if 'temp_video' in locals() and os.path.exists(temp_video):
os.remove(temp_video)
raise HTTPException(
status_code=500,
detail=f"Frame extraction failed: {str(e)}"
)
@router.post("/trim-video")
async def trim_video_api(
video_url: str = Form(...),
script: str = Form(...),
padding: float = Form(0.5),
model_size: str = Form("base")
):
"""
Trim video to end after last spoken word
"""
if not is_whisper_available():
raise HTTPException(
status_code=501,
detail="Whisper not installed. Install with: pip install openai-whisper moviepy"
)
try:
# Download video temporarily
import httpx
temp_video = tempfile.mktemp(suffix='.mp4')
output_video = tempfile.mktemp(suffix='_trimmed.mp4')
async with httpx.AsyncClient() as client:
response = await client.get(video_url)
if response.status_code != 200:
raise HTTPException(
status_code=400,
detail=f"Failed to download video: {response.status_code}"
)
with open(temp_video, 'wb') as f:
f.write(response.content)
# Trim video
output_path = trim_video_to_last_word(
temp_video,
script,
output_video,
padding=padding,
model_size=model_size
)
# Read trimmed video
with open(output_path, 'rb') as f:
video_data = f.read()
# Clean up
os.remove(temp_video)
os.remove(output_video)
# Return trimmed video
from fastapi.responses import Response
return Response(
content=video_data,
media_type="video/mp4",
headers={
"Content-Disposition": "attachment; filename=trimmed_video.mp4"
}
)
except Exception as e:
# Clean up temp files if they exist
for temp_file in ['temp_video', 'output_video']:
if temp_file in locals() and os.path.exists(locals()[temp_file]):
os.remove(locals()[temp_file])
raise HTTPException(
status_code=500,
detail=f"Video trimming failed: {str(e)}"
)
@router.get("/whisper-status")
async def whisper_status():
"""
Check if Whisper is available
"""
return {
"available": is_whisper_available(),
"message": "Whisper is available" if is_whisper_available()
else "Install with: pip install openai-whisper moviepy"
}