Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import subprocess | |
| import base64 | |
| import os | |
| import uuid | |
| import shutil | |
| import yt_dlp | |
| app = FastAPI() | |
| def greet_json(): | |
| return {"Hello": "Viral Cat Media Server Running!"} | |
| class VideoRequest(BaseModel): | |
| url: str | |
| platform: str | |
| is_pro: bool = False | |
| fps: int = 2 | |
| def file_to_base64(filepath): | |
| with open(filepath, "rb") as f: | |
| return base64.b64encode(f.read()).decode('utf-8') | |
| def process_video(req: VideoRequest): | |
| job_id = str(uuid.uuid4()) | |
| work_dir = f"/tmp/viralcat_{job_id}" | |
| os.makedirs(work_dir, exist_ok=True) | |
| video_path = f"{work_dir}/video.mp4" | |
| audio_path = f"{work_dir}/audio.mp3" | |
| try: | |
| # 1. Download Video | |
| # 🚨 FIX: Removed the broken 0.0.0.0 binding. | |
| # 🚨 ADDED: 'extractor_args' to bypass YouTube blocking cloud IPs. | |
| ydl_opts = { | |
| 'format': 'bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/best[height<=720][ext=mp4]/best', | |
| 'outtmpl': video_path, | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'nocheckcertificate': True, | |
| 'extractor_args': {'youtube': ['player_client=android']} # Disguises the server as a mobile phone | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([req.url]) | |
| # Verify video downloaded successfully | |
| if not os.path.exists(video_path): | |
| files = os.listdir(work_dir) | |
| if files: | |
| video_path = os.path.join(work_dir, files[0]) | |
| else: | |
| raise ValueError("Failed to download video file.") | |
| # 2. Check Duration (Limit to 2 minutes / 120 seconds if not Pro) | |
| probe = subprocess.run([ | |
| "ffprobe", "-v", "error", "-show_entries", | |
| "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", | |
| video_path | |
| ], capture_output=True, text=True, check=True) | |
| duration = float(probe.stdout.strip() or 0) | |
| if duration > 120 and not req.is_pro: | |
| raise ValueError(f"Video is {duration:.1f}s long. Free users are limited to 120 seconds.") | |
| # 3. Dynamic Frame Extraction (e.g., 2 frames per second) | |
| subprocess.run([ | |
| "ffmpeg", "-y", "-i", video_path, | |
| "-vf", f"fps={req.fps}", | |
| "-q:v", "2", f"{work_dir}/frame_%04d.jpg" | |
| ], check=True, capture_output=True) | |
| # 4. Extract Uncut Audio | |
| subprocess.run([ | |
| "ffmpeg", "-y", "-i", video_path, | |
| "-q:a", "0", "-map", "a", "-ac", "1", "-b:a", "64k", audio_path | |
| ], check=True, capture_output=True) | |
| # 5. Gather and convert to Base64 | |
| frame_files = sorted([f for f in os.listdir(work_dir) if f.startswith("frame_") and f.endswith(".jpg")]) | |
| frames_b64 =[file_to_base64(os.path.join(work_dir, f)) for f in frame_files] | |
| audio_b64 = file_to_base64(audio_path) if os.path.exists(audio_path) else None | |
| return { | |
| "success": True, | |
| "duration": duration, | |
| "total_frames_extracted": len(frames_b64), | |
| "frames": frames_b64, | |
| "audio": audio_b64, | |
| "thumbnail_url": "https://placehold.co/400x600/png" | |
| } | |
| except ValueError as ve: | |
| raise HTTPException(status_code=400, detail=str(ve)) | |
| except Exception as e: | |
| error_detail = str(e) | |
| if hasattr(e, 'stderr') and getattr(e, 'stderr'): | |
| error_detail += f" | {e.stderr.decode()}" | |
| raise HTTPException(status_code=500, detail=error_detail) | |
| finally: | |
| # 🚨 CLEANUP: Prevent server bloat | |
| if os.path.exists(work_dir): | |
| shutil.rmtree(work_dir, ignore_errors=True) | |
| print(f"[CLEANUP] Deleted temporary folder: {work_dir}") |