from fastapi import FastAPI, HTTPException from pydantic import BaseModel import subprocess import base64 import os import uuid import shutil import yt_dlp app = FastAPI() @app.get("/") def greet_json(): return {"Hello": "Viral Cat Media Server Running!"} class VideoRequest(BaseModel): url: str platform: str is_pro: bool = False fps: int = 2 def file_to_base64(filepath): with open(filepath, "rb") as f: return base64.b64encode(f.read()).decode('utf-8') @app.post("/process-video") def process_video(req: VideoRequest): job_id = str(uuid.uuid4()) work_dir = f"/tmp/viralcat_{job_id}" os.makedirs(work_dir, exist_ok=True) video_path = f"{work_dir}/video.mp4" audio_path = f"{work_dir}/audio.mp3" try: # 1. Download Video # 🚨 FIX: Removed the broken 0.0.0.0 binding. # 🚨 ADDED: 'extractor_args' to bypass YouTube blocking cloud IPs. ydl_opts = { 'format': 'bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/best[height<=720][ext=mp4]/best', 'outtmpl': video_path, 'quiet': True, 'no_warnings': True, 'nocheckcertificate': True, 'extractor_args': {'youtube': ['player_client=android']} # Disguises the server as a mobile phone } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([req.url]) # Verify video downloaded successfully if not os.path.exists(video_path): files = os.listdir(work_dir) if files: video_path = os.path.join(work_dir, files[0]) else: raise ValueError("Failed to download video file.") # 2. Check Duration (Limit to 2 minutes / 120 seconds if not Pro) probe = subprocess.run([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ], capture_output=True, text=True, check=True) duration = float(probe.stdout.strip() or 0) if duration > 120 and not req.is_pro: raise ValueError(f"Video is {duration:.1f}s long. Free users are limited to 120 seconds.") # 3. Dynamic Frame Extraction (e.g., 2 frames per second) subprocess.run([ "ffmpeg", "-y", "-i", video_path, "-vf", f"fps={req.fps}", "-q:v", "2", f"{work_dir}/frame_%04d.jpg" ], check=True, capture_output=True) # 4. Extract Uncut Audio subprocess.run([ "ffmpeg", "-y", "-i", video_path, "-q:a", "0", "-map", "a", "-ac", "1", "-b:a", "64k", audio_path ], check=True, capture_output=True) # 5. Gather and convert to Base64 frame_files = sorted([f for f in os.listdir(work_dir) if f.startswith("frame_") and f.endswith(".jpg")]) frames_b64 =[file_to_base64(os.path.join(work_dir, f)) for f in frame_files] audio_b64 = file_to_base64(audio_path) if os.path.exists(audio_path) else None return { "success": True, "duration": duration, "total_frames_extracted": len(frames_b64), "frames": frames_b64, "audio": audio_b64, "thumbnail_url": "https://placehold.co/400x600/png" } except ValueError as ve: raise HTTPException(status_code=400, detail=str(ve)) except Exception as e: error_detail = str(e) if hasattr(e, 'stderr') and getattr(e, 'stderr'): error_detail += f" | {e.stderr.decode()}" raise HTTPException(status_code=500, detail=error_detail) finally: # 🚨 CLEANUP: Prevent server bloat if os.path.exists(work_dir): shutil.rmtree(work_dir, ignore_errors=True) print(f"[CLEANUP] Deleted temporary folder: {work_dir}")