from fastapi import FastAPI, HTTPException, UploadFile, File, Form import subprocess import base64 import os import uuid import shutil app = FastAPI() @app.get("/") def greet_json(): return {"status": "online", "engine": "Viral Cat Local Processor"} def file_to_base64(filepath): if not os.path.exists(filepath): return None with open(filepath, "rb") as f: return base64.b64encode(f.read()).decode('utf-8') @app.post("/process-video") async def process_video( file: UploadFile = File(...), fps: int = Form(2), is_pro: bool = Form(False) ): job_id = str(uuid.uuid4()) work_dir = f"/tmp/viralcat_{job_id}" os.makedirs(work_dir, exist_ok=True) video_path = os.path.join(work_dir, "video.mp4") audio_path = os.path.join(work_dir, "audio.mp3") try: # 1. Save the uploaded file locally with open(video_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # 2. Check Duration (120s limit for free) probe = subprocess.run([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ], capture_output=True, text=True, check=True) duration = float(probe.stdout.strip() or 0) if duration > 120 and not is_pro: raise ValueError(f"Video ({duration:.1f}s) exceeds 120s limit.") # 3. Extract Frames (2 per second) subprocess.run([ "ffmpeg", "-y", "-i", video_path, "-vf", f"fps={fps}", "-q:v", "4", f"{work_dir}/frame_%04d.jpg" ], check=True, capture_output=True) # 4. Extract Audio subprocess.run([ "ffmpeg", "-y", "-i", video_path, "-q:a", "0", "-map", "a", "-ac", "1", "-b:a", "64k", audio_path ], check=True, capture_output=True) # 5. Base64 Conversion frame_files = sorted([f for f in os.listdir(work_dir) if f.startswith("frame_") and f.endswith(".jpg")]) if len(frame_files) > 80: frame_files = frame_files[:80] # Safety cap frames_b64 =[file_to_base64(os.path.join(work_dir, f)) for f in frame_files] audio_b64 = file_to_base64(audio_path) print(f"Done with video, {len(frames_b64)}") return { "success": True, "total_frames": len(frames_b64), "duration": duration, "frames": frames_b64, "audio": audio_b64 } except Exception as e: print(f"ERROR: {str(e)}") return {"success": False, "error": str(e)} finally: # CLEANUP if os.path.exists(work_dir): shutil.rmtree(work_dir, ignore_errors=True)