from fastapi import FastAPI, HTTPException from pydantic import BaseModel import subprocess import base64 import os import uuid import shutil import whisper app = FastAPI() # Load Whisper Tiny once on startup print("Loading Whisper...") whisper_model = whisper.load_model("tiny") class VideoJsonRequest(BaseModel): video_base64: str def file_to_base64(filepath): if not os.path.exists(filepath): return None with open(filepath, "rb") as f: return base64.b64encode(f.read()).decode('utf-8') @app.post("/process-video") async def process_video(req: VideoJsonRequest): job_id = str(uuid.uuid4()) work_dir = f"/tmp/viralcat_{job_id}" os.makedirs(work_dir, exist_ok=True) video_path = os.path.join(work_dir, "video.mp4") audio_path = os.path.join(work_dir, "audio.wav") try: # 1. Decode Base64 string into actual MP4 file with open(video_path, "wb") as f: f.write(base64.b64decode(req.video_base64)) # 2. Get Duration probe = subprocess.run([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ], capture_output=True, text=True, check=True) duration = float(probe.stdout.strip() or 0) # 3. Extract exactly 15 frames # fps = 15 / max(duration, 1) fps = 1 / max(duration, 1) subprocess.run([ "ffmpeg", "-y", "-i", video_path, "-vf", f"fps={fps}", "-vframes", "15", "-q:v", "4", f"{work_dir}/frame_%03d.jpg" ], check=True, capture_output=True) # 4. Extract & Transcribe Audio subprocess.run([ "ffmpeg", "-y", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", audio_path ], check=True, capture_output=True) transcript = "" if os.path.exists(audio_path): result = whisper_model.transcribe(audio_path) transcript = result["text"].strip() # 5. Gather frames frame_files = sorted([f for f in os.listdir(work_dir) if f.startswith("frame_")]) frames_b64 = [file_to_base64(os.path.join(work_dir, f)) for f in frame_files] return { "success": True, "transcript": transcript, "frames": frames_b64 } except Exception as e: return {"success": False, "error": str(e)} finally: if os.path.exists(work_dir): shutil.rmtree(work_dir, ignore_errors=True)